Flink基本篇2
第五章 Flink运行时架构
概述
通常,我们只需要使用getExecutionEnvironment(),因为这将根据上下文执行正确的操作:如果你是在IDE中执行程序或作为常规Java程序执行,它将创建一个本地环境,该环境将在本地计算机上执行您的程序。如果您是从程序创建的JAR文件,并通过命令行调用它,则Flink集群管理器将执行您的main方法,而getExecutionEnvironment()将返回用于在集群上执行程序的执行环境。
Task Slot就是任务槽,就是执行任务所需的最小的一块资源。
flink客户端并不是运行时系统的一部分,只是起了一个提交作业的作用。
对于运行时系统而言,主要就由job manager和task manager组成。
作业管理器
控制一个应用程序执行的主进程,是Flink集群中任务管理和调度的核心。
Job master
是Job manager中最核心的组件,一个job 对应于一个job master。
在作业提交时,Job Master会先接收到要执行的应用。一般是由客户端提交来的,包括jar包,数据流图(dataflow graph)和作业图(job graph)。
job master会把jobGraph转换成一个物理层面的数据流图,这个图被称为执行图,它包含了所有可以并发执行的任务,job master会向资源管理器发出请求,申请执行任务必要的资源,一旦获得了足够的资源,就会将执行图分发到真正运行他们的task manager上。
资源管理器
资源管理器主要负责资源的分配和管理,在flink集群中只有一个。所谓资源就是task manager的任务槽,任务槽就是flink集群的资源调配单元,包含了机器用来执行作业的一组CPU和内存资源。每一个任务task都需要分配到一个slot上执行。
任务管理器 task manager
flink的工作进程,通常在flink集群中会有多个task manager运行,每一个task manager都包含了一定数量的slots。slots的数量限制了task manager能够并行处理的任务数量。
作业提交流程
standalone模式
yarn会话模式
task manager跑在容器里
思考
一个流处理程序,到底包含多少个任务?
最终执行任务,需要占用多少slot?
所有flink程序都是由三部分组成的:source、transformation和sink
source负责读取数据源,transformation利用各种算子进行处理加工,sink负责输出。
一些重要概念
并行度
每一个算子(operator)可以包含一个或多个子任务,这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行(在不同的task manager slot中执行)。
一个特定算子的子任务的个数被称之为这个算子的并行度。
一个作业里面的不同算子可以设置不同的并行度。并行度是针对每个特定算子而言的。
env也可以设置并行度,表示全局的所有算子的并行度。
如果在env设置了并行度,在后面的算子处又设置了并行度,那么优先级是算子本身的并行度优先级最高。
优先级:
每个算子的并行度设置 > env环境的并行度设置 > 作业提交的并行度设置
一个流处理程序,可以认为是一个作业,一个作业提交后,不等于一个任务,而是多个子任务,每一个任务会到task manager的一个slot执行。具体是多少子任务,这取决于并行度的设置。
数据传输形式
One-To-One
stream维护着分区以及元素的顺序,比如source和map之间,这意味着map算子的子任务看到的元素的个数以及顺序和source算子的子任务生产的元素的个数、顺序相同。map、filter、flatmap都是One-To-One的关系。
算子链怎么合并:
前后发生的两个算子之间是One-To-One的关系,且算子的并行度相同,那么可以合并成一个算子链,在拓扑图中可以看到。可以节省开销。
执行图
flink中的执行图可以分为四层:
- StreamGraph
- JobGraph
- ExecutionGraph
- 物理执行图
StreamGraph:是根据用户通过Stream API编写的代码生成的最初的图,用来表示程序的拓扑结构。
JobGraph:StreamGraph经过优化后,生成了JobGraph,提交给JobManager的数据结构,主要的优化为,将多个符合条件的节点chain在一起作为一个节点(合并成算子链),节省开销。
ExecutionGraph:JobManager根据JobGraph生成的图,ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
任务和任务槽
flink中每一个task manager都是一个JVM进程,为了控制一个task manager能接收多少task,task manager通过task slot来进行控制(一个task manager至少有一个slot)
slot是对资源的一种划分。
slot的隔离主要是针对内存。
配置task manager的slot数量的时候,建议按cpu的核心数量来配置,一个slot分配给一个核,这样CPU就不用分时复用了。
为什么5个任务,用两个slot就跑起来了?
是因为在flink里面,任务之间默认是可以进行slot共享的。
同一个算子的并行子任务,在不同的slot执行;不同算子的任务,有先后顺序的任务,可以共用同一个slot
这样的结果是,一个slot可以保存作业的整个管道。
要配置slot数量和并行度,保证资源的充分利用。
第六章 Datastream API
flink新版本已经实现了流批一体,DataSet API将被弃用,官方推荐统一使用Datastream API来处理流数据和批数据。
Datastream本身是一个flink中用来表示数据集合的类,我们编写的flink代码其实就是基于这种数据类型的处理。我们用一系列API来处理这个集合中的数据,这就叫做数据流的transformations。
一个flink程序,其实就是对Datastream的各种转换,具体来说,代码基本上都由以下几部分构成
- 获取执行环境
- 读取数据源
- 定义基于数据的转换操作
- 定义计算结果的输出位置
- 触发程序执行
执行环境
创建执行环境
最简单的方式,就是直接调用getExecutionEnvironment方法,它会根据当前运行的上下文直接得到正确的结果。如果程序是独立运行的,就返回一个本地执行环境;如果是打包成了jar包,然后提交到集群执行,那么就返回集群执行环境。也就是说这个方法会根据当前运行的方式,自行决定该返回什么样的执行环境。
在获取到执行环境后,我们可以对执行环境进行设置,比如可以设置全局并行度,禁用算子链等。
源算子
首要任务就是把数据读进来。
用来读取数据源的算子就叫源算子。source是整个处理程序的输入端。
flink代码中通用的添加source的方式,就是调用执行环境的addSource方法
在实际工作中,我们基本不会碰到很简单的数据类型,不会有很简单的应用需求。需要处理的数据往往是具有多个字段的数据类型。所以我们一般情况会把处理数据包装成元组类型或者说POJO对象,方便后面的处理和转换。
为了更好地理解,先构建一个实际应用场景,比如网站的访问操作,可以抽象成一个三元组(用户名、用户访问的url,用户访问url的时间戳),所以在这里,我们可以创建一个类Event,将用户行为包装成一个对象。这个对象包含上述的三个字段。
准备工作
准备POJO
package com.atguigu.wc.pojo.diskData; import java.sql.Timestamp; public class Event { public String user; public String url; public Long timestamp; public Event() { } public Event(String user, String url, Long timestamp) { this.user = user; this.url = url; this.timestamp = timestamp; } public String getUser() { return user; } public void setUser(String user) { this.user = user; } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public Long getTimestamp() { return timestamp; } public void setTimestamp(Long timestamp) { this.timestamp = timestamp; } @Override public String toString() { return "Event{" + "user='" + user + '\'' + ", url='" + url + '\'' + ", timestamp=" + new Timestamp(timestamp) + '}'; } }
定义的POJO,有这样几个特点:
- 类是公有的
- 有一个无参的构造方法
- 所有属性都是公有的
- 所有属性的类型都是可以序列化的
flink会把这样的类作为一种特殊的POJO数据类型来对待,方便数据的解析和序列化。另外我们还在类中重写了toString 方法,主要是为了测试输出显示更清晰。
读取数据
根据上面的知识已经知道,一个main启动类里面,source算子可以有多个,也就是数据来源可以有多个。
public class SourceTest {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//1. 从文件中读取数据,readTextFile,在实际工作中也很常用,读取日志文件中的数据
// 当然结合flume + kafka,可以将日志文件中的数据做清洗,生产到kafka中,将kafka作为flink的数据源。
DataStreamSource<String> stream1 = env.readTextFile("input/clicks.txt");
//2. 从集合中读取数据
List<Integer> nums = new ArrayList<>();
nums.add(2);
nums.add(5);
// DataStreamSource泛型里面的类型就是我们当前传入的数据类型
DataStreamSource<Integer> numStream = env.fromCollection(nums);
List<Event> events = new ArrayList<>();
events.add(new Event("Mary", "./home", 1000L));
events.add(new Event("Bob", "./cart", 2000L));
DataStreamSource<Event> stream2 = env.fromCollection(events);
// 3. 从元素读取数据
DataStreamSource<Event> stream3 = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);
//4. 从socket文本流读取
DataStreamSource<String> stream4 = env.socketTextStream("192.168.14.131", 7777);
// stream1.print("1");
// numStream.print("nums");
// stream2.print("2");
stream4.print("socket");
env.execute();
}
}
从kafka读取数据
上面读取的数据都是有界流,从内存列表读,从文件读,都是有界流。从socket读适合吞吐量不大,做测试的时候。
对于真正的流数据,实际项目应该怎样读取呢?
kafka作为分布式消息队列,是一个高吞吐、易于扩展的消息系统。而消息队列的传输方式,恰恰与流处理是一致的,所以说kafka和flink可以说是天生一对。
在如今的实时流处理应用中,由kafka进行数据的收集和传输,flink进行分析计算,这样的架构已成为众多企业的首选。
需要引入kafka连接器
<!--kafka-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "xxxxxx:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
env.addSource(new FlinkKafkaConsumer<String>(
"clicks",
new SimpleStringSchema(),
properties
));
自定义source
创建自定义的数据源,实现SourceFunction接口,主要重写两个关键方法:run()和cancel()
run():
使用运行时上下文对象(SourceContext)向下游发送数据
run方法里面设置一个标志位
cancel()
取消当前作业,将标志位改为false
定义一个随机生成数据的sourceFuntion,注意要指定泛型,也就是数据源的类型
public class ClickSource implements SourceFunction<Event> {
// 声明一个标志位
private Boolean running = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
// 定义随机生成数据
Random random = new Random();
// 定义生成随机数据选取的字段
String[] users = {"Marry", "Alice", "Bob", "Cary"};
String[] urls = {"./home", "./cart", "./fav", "./prod?id=10"};
// 循环生成数据
while (running) {
String user = users[random.nextInt(users.length)];
String url = urls[random.nextInt(urls.length)];
Long timestamp = Calendar.getInstance().getTimeInMillis();
ctx.collect(new Event(user, url, timestamp));
Thread.sleep(1000L);
}
}
@Override
public void cancel() {
running = false;
}
}
//main
public class SourceCustomTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 自定义source function
DataStreamSource<Event> customDataStream = env.addSource(new ClickSource());
customDataStream.print();
env.execute();
}
}
这种用户自定义数据源的方法,可以很好地用于测试。
如果自定义的sourceFunction不是并行sourceFunction,则不能设置source算子的并行度,对于非并行sourceFunction的并行度,只能设置为1.
自定义并行source
// 实现自定义的并行sourceFunction
public static class ParallelCustomSource implements ParallelSourceFunction<Integer> {
private Boolean running = true;
private Random random = new Random();
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
while (running) {
ctx.collect(random.nextInt());
}
}
@Override
public void cancel() {
running = false;
}
}
对于这个自定义的并行数据源就可以给source设置并行度了。
flink支持的数据类型
flink支持的数据类型有Java所有的基本类型及其包装类,再加上void,String,Date,BigDecimal和BigInteger
支持数组类型,包括基本类型数组和对象数组
复合数据类型
- Java元组类型(Tuple0-Tuple25)
- Scala样例类及Scala元组
- 行类型,可以认为是具有任意个字段的元组
- POJO
flink对POJO类型要求如下:
- 类是public的且独立的,没有非静态的内部类。
- 类有一个公共的无参构造方法
- 类中所有字段是public且非final的,可以直接访问到;或者有公告的getter和setter方法。
flink还支持List、Map等辅助类型。
在这些类型中,元组类型和POJO类型最灵活,因为他们支持创建复杂类型,而相比之下,POJO还支持在键的定义中直接使用字段名,所以在实际项目中,往往会将流处理程序中的元素类型定义为Flink中的POJO类型。
转换算子(Transformation)
数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream,一个Flink程序的核心,其实就是所有的转换操作,他们决定了处理的业务逻辑。
可以针对一条流进行转换处理,也可以进行分流、合流等多流转换操作。
map
public class TransformMapTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> stream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);
SingleOutputStreamOperator<String> result = stream.map(new MyMapper());
result.print();
env.execute();
}
// 自定义map function
public static class MyMapper implements MapFunction<Event, String> {
@Override
public String map(Event event) throws Exception {
return event.user;
}
}
}
filter
顾名思义就是对数据流做一个过滤操作,通过一个布尔条件表达式设置一个过滤条件,对每一个流内元素进行判断。
filter转换需要传入的参数需要实现FilterFunction接口。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> stream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);
SingleOutputStreamOperator<Event> result = stream.filter(new FilterFunction<Event>() {
@Override
public boolean filter(Event event) throws Exception {
if (Objects.equals(event.user, "Mary")) {
return true;
}
return false;
}
});
result.print();
env.execute();
}
flatMap
可以看作前面两步操作的结合。
主要是将数据流中的整体(一般是集合类型)拆分成一个个的个体使用,可以产生0到多个元素。
flatMap也可以使用Lambda表达式或者FlatMapFunction接口实现类的方式来进行传参,返回值类型可以与原数据流相同,也可以不同。
FlatMapFunction泛型两个参数分别是输入数据的数据类型和输出数据的数据类型
public class TransformFlatMapTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> stream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);
stream.flatMap(new MyFlatMap()).print();
env.execute();
}
public static class MyFlatMap implements FlatMapFunction<Event, String> {
@Override
public void flatMap(Event event, Collector<String> out) throws Exception {
out.collect(event.user);
out.collect(event.url);
out.collect(event.timestamp.toString());
}
}
}
聚合算子
聚合操作,计算的结果不仅依赖于当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并,这就是aggregation,也对应着MapReduce中的Reduce。
按键分区-keyBy
对于Flink而言,DataStream是没有直接进行聚合的API的,因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。
所以在Flink中要做聚合,先要进行分区,这个操作就是通过keyBy来完成的。
这里所说的分区是逻辑上的分区。基于key分区的数据有可能被划分到物理上不同的分区,即不同task manager的不同slot。
flink的keyBy分区,是基于key的哈希值。所以这里如果key是POJO的话,那么必须重写POJO的hashCode()方法。
有很多不同的方法来指定key,比如对于Tuple数据类型,可以指定字段的位置或者多个位置的组合;对于POJO类型,可以指定字段的名称;另外,还可以传入Lambda表达式,或者实现一个键选择器KeySelector,用于说明从数据提取key的逻辑。
简单聚合
public class TransformSimpleAggTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<Event> stream = env.fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice", "./prod?id=100", 3000L), new Event("Bob", "./prod?id=1", 3300L), new Event("Bob", "./home", 4000L), new Event("Bob", "./prod?id=2", 3500L), new Event("Bob", "./prod?id=3", 3800L) ); //按键分组之后进行聚合, 提取用户最近一次访问数据 // 我们要选择user作为key,那么key类型就是String SingleOutputStreamOperator<Event> result = stream.keyBy(new KeySelector<Event, String>() { @Override public String getKey(Event event) throws Exception { return event.user; } }).max("timestamp"); SingleOutputStreamOperator<Event> result1 = stream.keyBy(data -> data.user).maxBy("timestamp"); result.print("max:"); result1.print("maxBy:"); env.execute(); } }
max只针对指定的字段,maxBy则会输出整条数据,也包括其他字段。
简单聚合可以分为两步:1.keyBy;2.聚合
归约聚合
T reduce(T var1, T var2) throws Exception;
var1表示已经归约好的结果,这就是有状态的处理。
public class TransformReduceTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<Event> stream = env.fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice", "./prod?id=100", 3000L), new Event("Bob", "./prod?id=1", 3300L), new Event("Bob", "./home", 4000L), new Event("Bob", "./prod?id=2", 3500L), new Event("Bob", "./prod?id=3", 3800L) ); // 统计最活跃的用户是谁 // 1. 统计每个用户的访问频次 SingleOutputStreamOperator<Tuple2<String, Long>> clicksByUser = stream.map(new MapFunction<Event, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(Event event) throws Exception { return Tuple2.of(event.user, 1L); } }).keyBy(data -> data.f0) .reduce(new ReduceFunction<Tuple2<String, Long>>() { @Override public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception { // 把两个要归约的值叠加起来,那么就是把value1和value2的第二个字段叠加起来 return Tuple2.of(value1.f0, value1.f1 + value2.f1); } }); //2. 根据统计的访问频次,选择最活跃的用户 SingleOutputStreamOperator<Tuple2<String, Long>> result = clicksByUser.keyBy(data -> "key") .reduce(new ReduceFunction<Tuple2<String, Long>>() { @Override public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception { if (value1.f1 > value2.f1) { return value1; } return value2; } }); result.print(); env.execute(); } }
用户自定义函数
富函数类
所有的Flink函数类都有其Rich版本,富函数类一般是以抽象类的形式出现的。与常规函数类的不同主要在于,富函数类可以获取运行函数的上下文,并拥有一些生命周期方法。
生命周期的概念在编程中其实非常重要。比如对象的生命周期,涉及垃圾回收、内存管理等。
RichFunction有生命周期的概念,典型的生命周期方法有:
open()
当前任务实例被创建的时候首先会调用的方法。
是RichFunction的初始化方法,也就是会开启一个算子的生命周期。**当一个算子的实际工作方法例如map()或者filter()被调用之前,open()会被首先调用。**所以像文件IO的创建,数据库连接的创建,配置文件读取等这样的操作,适合在open()方法中完成。
close()
生命周期的最后一个方法,用来做一些清理和资源释放工作。
继承富函数类,除了实际工作方法需要重写之外,也要写生命周期方法。
生命周期方法只会被调用一次,实际工作方法,每次数据到来都会被调用。
public static class MyRichMapper extends RichMapFunction<Event, Integer> {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
System.out.println("open生命周期被调用,第" + getRuntimeContext().getIndexOfThisSubtask() + "号任务启动");
}
@Override
public Integer map(Event event) throws Exception {
return event.url.Length();
}
@Override
public void close() throws Exception {
super.close();
System.out.println("close生命周期被调用,第" + getRuntimeContext().getIndexOfThisSubtask() + "号任务close");
}
}
ChatGpt keyBy分流再合流的用法
flink数据流中的数据,类型是json字符串,有两个字段,一个是type,一个是data,我想根据type的类型分别对数据流的data进行不同的处理,处理完之后再合流,应该怎么写?
参考:
您可以使用Flink的keyBy和process函数来实现您的需求。首先,您需要使用keyBy函数将数据流分区为具有相同类型的记录。然后,您可以使用process函数来处理每个记录并将其发送到下一个算子以进行合并。以下是一个示例代码:
DataStream<String> jsonDataStream = ... // your JSON data stream
// keyBy type field
KeyedStream<Tuple2<String, String>, String> keyedStream = jsonDataStream
.map(jsonStr -> {
// parse JSON string into Tuple2<String, String>
JSONObject jsonObj = new JSONObject(jsonStr);
String type = jsonObj.getString("type");
String data = jsonObj.getString("data");
return Tuple2.of(type, data);
})
.keyBy(tuple -> tuple.f0);
// process data based on type
DataStream<String> processedDataStream = keyedStream
.process(new ProcessFunction<Tuple2<String, String>, String>() {
@Override
public void processElement(Tuple2<String, String> tuple, Context context, Collector<String> collector) throws Exception {
String type = tuple.f0;
String data = tuple.f1;
// process data based on type
if (type.equals("type1")) {
// process data for type1
String processedData = processDataForType1(data);
collector.collect(processedData);
} else if (type.equals("type2")) {
// process data for type2
String processedData = processDataForType2(data);
collector.collect(processedData);
} else {
// unknown type
System.out.println("Unknown type: " + type);
}
}
});
// define your own processDataForType1 and processDataForType2 functions
private String processDataForType1(String data) {
// process data for type1
return data.toUpperCase();
}
private String processDataForType2(String data) {
// process data for type2
return data.toLowerCase();
}
在上面的代码中,我们首先将JSON字符串转换为具有类型和数据字段的元组。然后,我们使用keyBy函数将数据流分区为具有相同类型的记录。最后,我们使用process函数来处理每个记录并将其发送到下一个算子以进行合并。在process函数中,我们根据类型字段处理数据,并将处理后的数据发送到下一个算子以进行合并。
flink数据流中的数据,类型是json字符串,有两个字段,一个是type,一个是data,我想根据type的类型分别对数据流的data进行不同的处理,这里的处理包括对data的某些字段按事件时间进行聚合操作,处理完之后再合流,应该怎么写?
如果您需要根据JSON数据的type字段对数据流中的数据进行不同的处理,并按事件时间对数据进行聚合操作,您可以使用Flink的keyBy函数将数据流分区为具有相同类型的记录,并使用window函数和reduce函数对每个窗口中的数据进行聚合操作。接下来,您可以使用union函数将处理后的数据流合并为一个数据流。以下是一个示例代码:
Copy
// POJO class for JSON data
public class Data {
public String type;
public String data;
public Data() {}
public Data(String type, String data) {
this.type = type;
this.data = data;
}
}
DataStream<String> jsonDataStream = ... // your JSON data stream
// map JSON string to Data object
DataStream<Data> dataStream = jsonDataStream
.map(jsonStr -> {
// parse JSON string into Data object
JSONObject jsonObj = new JSONObject(jsonStr);
String type = jsonObj.getString("type");
String data = jsonObj.getString("data");
return new Data(type, data);
});
// keyBy type field
KeyedStream<Data, String> keyedStream = dataStream
.keyBy(data -> data.type);
// window and reduce
DataStream<Data> processedDataStream = keyedStream
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce(new ReduceFunction<Data>() {
@Override
public Data reduce(Data data1, Data data2) throws Exception {
// parse JSON string into JSONObject
JSONObject jsonObj1 = new JSONObject(data1.data);
JSONObject jsonObj2 = new JSONObject(data2.data);
// aggregate fields based on event time
double field1 = Math.max(jsonObj1.getDouble("field1"), jsonObj2.getDouble("field1"));
int field2 = jsonObj1.getInt("field2") + jsonObj2.getInt("field2");
// add more fields as needed
// create new JSON object
JSONObject newJsonObj = new JSONObject();
newJsonObj.put("field1", field1);
newJsonObj.put("field2", field2);
// add more fields as needed
// create new Data object
Data newData = new Data(data1.type, newJsonObj.toString());
return newData;
}
});
// union processed data streams
DataStream<Data> unionedDataStream = processedDataStream1
.union(processedDataStream2)
.union(processedDataStream3)
// add more data streams as needed
.keyBy(data -> data.type)
.process(new ProcessFunction<Data, Data>() {
@Override
public void processElement(Data data, Context context, Collector<Data> collector) throws Exception {
collector.collect(data);
}
});
在上面的代码中,我们首先将JSON字符串转换为Data对象。然后,我们使用keyBy函数将数据流分区为具有相同类型的记录。接下来,我们使用window函数和reduce函数对每个窗口中的数据进行聚合操作。在reduce函数中,我们首先将JSON字符串转换为JSONObject,然后根据事件时间进行聚合操作,并创建新的JSON对象和Data对象。最后,我们使用union函数将处理后的数据流合并为一个数据流,并使用process函数将数据发送到下一个算子以进行处理。
上面的代码,不同type的data聚合时间不同,有些data需要1小时聚合一次,有些data需要4小时聚合一次,能否写详细一些
如果不同type的data需要根据不同的时间窗口进行聚合操作,您可以使用Flink的assignTimestampsAndWatermarks函数为每个记录分配事件时间,并使用keyBy函数将数据流分区为具有相同类型的记录。接下来,您可以使用window函数和reduce函数对每个窗口中的数据进行聚合操作。在window函数中,您可以使用EventTimeSessionWindows或EventTimeProcessingTimeWindows等窗口类型,并根据数据的类型和事件时间选择不同的时间窗口长度。以下是示例代码:
Copy
// POJO class for JSON data
public class Data {
public String type;
public String data;
public Data() {}
public Data(String type, String data) {
this.type = type;
this.data = data;
}
}
DataStream<String> jsonDataStream = ... // your JSON data stream
// map JSON string to Data object and assign event time
DataStream<Data> dataStream = jsonDataStream
.map(jsonStr -> {
// parse JSON string into Data object
JSONObject jsonObj = new JSONObject(jsonStr);
String type = jsonObj.getString("type");
String data = jsonObj.getString("data");
// assign event time based on data type
long eventTime = 0;
if (type.equals("type1")) {
eventTime = jsonObj.getLong("timestamp");
} else if (type.equals("type2")) {
eventTime = jsonObj.getLong("timestamp") + 4 * 60 * 60 * 1000; // 4 hours later
} else {
// add more types as needed
}
return new Data(type, data);
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Data>(Time.minutes(1)) {
@Override
public long extractTimestamp(Data data) {
// extract event time from data
JSONObject jsonObj = new JSONObject(data.data);
long eventTime = 0;
if (data.type.equals("type1")) {
eventTime = jsonObj.getLong("timestamp");
} else if (data.type.equals("type2")) {
eventTime = jsonObj.getLong("timestamp") + 4 * 60 * 60 * 1000; // 4 hours later
} else {
// add more types as needed
}
return eventTime;
}
});
// keyBy type field
KeyedStream<Data, String> keyedStream = dataStream
.keyBy(data -> data.type);
// window and reduce
DataStream<Data> processedDataStream = keyedStream
.window(EventTimeSessionWindows.withGap(Time.hours(1)))
.reduce(new ReduceFunction<Data>() {
@Override
public Data reduce(Data data1, Data data2) throws Exception {
// parse JSON string into JSONObject
JSONObject jsonObj1 = new JSONObject(data1.data);
JSONObject jsonObj2 = new JSONObject(data2.data);
// aggregate fields based on event time
double field1 = Math.max(jsonObj1.getDouble("field1"), jsonObj2.getDouble("field1"));
int field2 = jsonObj1.getInt("field2") + jsonObj2.getInt("field2");
// add more fields as needed
// create new JSON object
JSONObject newJsonObj = new JSONObject();
newJsonObj.put("field1", field1);
newJsonObj.put("field2", field2);
// add more fields as needed
// create new Data object
Data newData = new Data(data1.type, newJsonObj.toString());
return newData;
}
});
// union processed data streams
DataStream<Data> unionedDataStream = processedDataStream1
.union(processedDataStream2)
.union(processedDataStream3)
// add more data streams as needed
.keyBy(data -> data.type)
.process(new ProcessFunction<Data, Data>() {
@Override
public void processElement(Data data, Context context, Collector<Data> collector) throws Exception {
collector.collect(data);
}
});
在上面的代码中,我们为每个记录分配了事件时间,并使用keyBy函数将数据流分区为具有相同类型的记录。接下来,我们使用window函数和reduce函数对每个窗口中的数据进行聚合操作。在window函数中,我们使用EventTimeSessionWindows窗口类型,并根据数据的类型和事件时间选择不同的时间窗口长度。例如,对于type1
物理分区
keyBy是逻辑分区,至于分得均不均匀,每个key的数据要到哪一区去,这些是完全无从控制的。
因为按key划分,可能分出很多key,远远超过slot数
物理分区可以控制分区策略,精准调配数据.
例如我们编写的程序,可能对多个处理任务或者说算子设置了不同的并行度,那么当数据执行的上下游任务并行度变化时,数据就不应该在当前分区以直通方式传输了,因为如果并行度变小,当前分区可能没有下游任务了;而如果并行度变大,所有数据还在原先的分区处理就会导致资源的浪费。所以这种情况下,系统会自动地将数据均匀地发往下游所有并行任务,保证各个分区负载均衡。
// 随机分区
stream.shuffle().print().setParallelism(4);
// 轮询分区
stream.rebalance().print().setParallelism(4);
输出算子
flink作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。
建立连接,释放连接,在继承自RichFunction的类里就可以实现这些。
但是一般不这样做。flink把向外部系统写入这步操作抽象提炼成了sink。
dataStream的print()方法,内部也是实现了addSink()
@PublicEvolving
public DataStreamSink<T> print() {
PrintSinkFunction<T> printFunction = new PrintSinkFunction();
return this.addSink(printFunction).name("Print to Std. Out");
}
PrintSinkFunction继承自RichSinkFunction
SinkFunction多数情况下并不需要我们自己实现。
像kafka之类流式系统,flink提供了完美对接,source、sink两端都能连,可读可写,而对于es、文件系统、jdbc等数据存储系统,则只提供了输出写入的sinkFunction
输出到文件
public class SinkToFileTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> stream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Bob", "./prod?id=1", 3300L),
new Event("Bob", "./home", 4000L),
new Event("Bob", "./prod?id=2", 3500L),
new Event("Bob", "./prod?id=3", 3800L)
);
StreamingFileSink<String> fileSink =
StreamingFileSink.<String>forRowFormat(new Path("./output"),
//指定滚动策略
new SimpleStringEncoder<>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
//指定文件大小,达到多少就归档保存
.withMaxPartSize(1024 * 1024 * 1024)
//定义隔多长时间滚动一次,开启一个新的文件
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
// 指定多久没有数据到来,文件就归档保存
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.build()
)
.build();
stream.map(Event::toString).addSink(fileSink);
}
}
输出到kafka
kafka是消息队列,处理的也是流式数据,和flink天生一对。
flink与kafka的连接器提供了端到端的精确一次的语义保证,这在实际项目中是最高级别的一致性保证。
public class SinkToKafkaTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//kafka配置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "xxxxx:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
//从Kafka获取数据流
DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<String>(
"clicks",
new SimpleStringSchema(),
properties
));
// 读到的是String,要转换成event
SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String[] fields = value.split(",");
return new Event(fields[0], fields[1], Long.valueOf(fields[2].trim())).toString();
}
});
// 结果数据写入kafka
result.addSink(new FlinkKafkaProducer<String>(
"xxxxx:9092", "events", new SimpleStringSchema()
));
env.execute();
}
}
第七章 时间和窗口
在流处理应用中,很重要很常见的操作就是窗口计算。
所谓的窗口一般就是划定的一段时间范围,对这范围内的数据进行处理,就是所谓的窗口计算。所以窗口和时间往往是分不开的。
时间语义
事件产生的时间,进入Flink的时间,以及被某个算子处理的时间完全是不一样的。
事件发生之后,生成的数据被收集起来,首先进入分布式消息队列,然后被Flink系统的source算子读取消费,进而向下游的窗口算子传递,最后进行处理计算。
这里有两个非常重要的时间点,一个是数据产生的时间,一个是数据进入窗口真正被处理的时间,这两个时间不一样,我们定义的窗口操作到底以哪种时间作为衡量标准,就是所谓的时间语义。
在计算机系统中,考虑数据处理的时间是没什么意义的,我们更关心的,显然是数据本身产生的时间。所以在实际应用中,事件时间语义会更常见,也就是事件产生时间而不是事件处理时间。一般情况下,业务日志数据处理中都会记录数据生成的时间戳,它就可以作为事件时间判断的基础。
事件时间,也就是数据生成的时间,数据一旦产生,这个时间就确定了,所以它可以作为一个属性嵌入到数据中,这其实就是这条数据的时间戳。
在事件时间语义下,我们对于时间的衡量,就不看任何机器的系统时间了,而是依赖于数据本身。由于分布式系统中网络传输延迟的不确定性,实际应用中我们要面对的数据流往往是乱序的。在这种情况下,就不能简单地把数据自带的时间戳当作时钟了,而需要用另外的标志来表示事件时间进展,在Flink中把它叫做事件时间的水位线(watermarks)。
水位线
在窗口的处理过程中,我们可以基于数据的时间戳,自定义一个逻辑时钟,这个时钟的时间不会自动流逝,它的时间进展,就是靠着新到数据的时间戳来推动的。
这样的好处在于,计算的过程可以完全不依赖于处理时间(系统时间)。
在Flink中,用来衡量事件时间进展的标记,就被称作水位线。
水位线可以看作一条特殊的数据记录,它是插入
到数据流中的一个标记点,主要内容就是一个时间戳
,用来指示当前的事件时间。而它插入流中的位置,就是在某个数据到来之后。
有序流中的水位线
在理想状态下
,数据应该按照他们的先后顺序进入流中,这样的话我们从每个数据中提取时间戳,就可以保证总是从小到大增长的,从而插入的水位线也会不断增长,事件时钟不断向前推进。实际应用中,如果当前数据量非常大,可能会有很多数据的时间戳是相同的,这时每来一条数据就提取时间戳,插入水位线就做了大量的无用功,而且即使时间戳不同,同时涌来的数据时间差会非常小,往往对处理计算也没什么影响。所以为了提高效率,一般每隔一段时间就会生成一个水位线,这个水位线的时间戳就是当前最新数据的时间戳,所以这时的水位线,其实就是有序流的一个周期性出现的时间标记。
乱序流中的水位线
在分布式系统中,数据在节点中传输,会因为网络传输延迟的不确定性,导致顺序发生改变,这就是所谓的乱序数据。
一个7秒时产生的数据,生成时间自然要比9秒的数据早,但是经过数据缓存和传输之后,处理任务可能先收到了9秒的数据,之后才是7秒的数据,这就是乱序。
解决思路:
还是靠数据来驱动,不依靠真正的系统时间,每来一个时间就提取它的时间戳,插入一个水位线。不过现在的情况是数据乱序,所以插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线。也就是说,只有数据的时间戳比当前时间戳大,才能推动时间推进,才插入水位线。
水位线watermark(t),表示在当前流中,事件时间已经达到了时间戳t,这代表t之前的所有数据都到齐了,之后流中不会出现小于时间戳t的数据。
设置水位线的方法
assignTimestampsAndWatermarks()
里面要传一个参数,就是watermark的生成策略。
水位线生产的最佳位置是在尽可能靠近数据源的地方,因为水位线生成时会做出一些有关元素顺序相对时间戳的假设。由于数据源读取过程是并行的,一切引起Flink跨行数据流分区进行重新分发的操作(比如:改变并行度,keyby等)都会导致元素时间戳乱序。但是如果是某些初始化的filter、map等不会引起元素重新分发的操作,所以是可以考虑在生成水位线之前使用。
public class WatermarkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 设置生成水位线的时间间隔
env.getConfig().setAutoWatermarkInterval(100);
DataStream<Event> stream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Bob", "./prod?id=1", 3300L),
new Event("Bob", "./home", 4000L),
new Event("Bob", "./prod?id=2", 3500L),
new Event("Bob", "./prod?id=3", 3800L)
)
// 有序流的watermark生成
// .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
// .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
// @Override
// public long extractTimestamp(Event event, long recordTimestamp) {
// return event.timestamp;
// }
// }));
//乱序流的watermark生成,要指定延迟时间,处理迟到数据
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long recordTimestamp) {
return event.timestamp;
}
}));
env.execute();
}
}
水位线的生成,一定要有水位线的生成器和时间戳的提取器。
窗口
概念
在大数据流式计算中,基于时间最常见的操作就是窗口,最典型的利用时间来控制输出的就是窗口。
我们很容易把窗口想象成一个固定位置的“框”,数据源源不断流过来,到某个时间点窗口该关闭了,就停止收集数据、触发计算并输出结果。
分类
在flink中,窗口的应用非常灵活,我们可以使用各种不同类型的窗口来实现需求。
按照驱动类型分类
窗口本身是截取有界数据的一种方式,以什么标准来开始和结束数据的截取。
按照时间段去截取数据的窗口就叫做时间窗口,这在实际应用中最常见,除了由时间驱动之外,窗口其实也可以由数据驱动,也就是说按照固定的个数,来截取一段数据集,这叫计数窗口。
时间窗口
- 滚动窗口
- 滑动窗口
- 会话窗口
计数窗口
全局窗口
这种窗口全局有效,会把相同key的所有数据都分配到同一个窗口中,这种窗口没有结束的时候,默认是不会触发计算的,如果希望它能对数据进行计算处理,还需要自定义触发器trigger
窗口API
按键分区和非按键分区
在定义窗口操作之前,首先需要确定,是基于按键分区的数据流KeyedStream来开窗,还是直接在没有分区的DataStream上开窗,也就是说,在调用窗口算子之前,是否有keyBy操作。
按键分区窗口
经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流,这就是keyedStream,基于keyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务。所以可以认为,每个key上都定义了一组窗口,各自独立地进行统计计算。
非按键分区窗口
如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流,这时窗口逻辑只能在一个任务上执行,这就相当于并行度为1.所以在实际应用中一般不推荐使用这种方式。
这里需要注意的是,对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作。
窗口分配器
定义几种窗口的方法
public class WindowTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 设置生成水位线的时间间隔
env.getConfig().setAutoWatermarkInterval(100);
DataStream<Event> stream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Bob", "./prod?id=1", 3300L),
new Event("Bob", "./home", 4000L),
new Event("Bob", "./prod?id=2", 3500L),
new Event("Bob", "./prod?id=3", 3800L)
)
// 有序流的watermark生成
// .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
// .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
// @Override
// public long extractTimestamp(Event event, long recordTimestamp) {
// return event.timestamp;
// }
// }));
//乱序流的watermark生成,要指定延迟时间,处理迟到数据
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long recordTimestamp) {
return event.timestamp;
}
}));
stream.keyBy(data -> data.user)
.countWindow(10, 2);// 计数窗口, 传1个参数就是滚动窗口,传2个参数就是滑动窗口
// .window(EventTimeSessionWindows.withGap(Time.seconds(2)));//事件时间会话窗口
// .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)));// 滑动时间窗口,1小时,步长5min
// .window(TumblingEventTimeWindows.of(Time.hours(1))); // 定义1小时的滚动时间窗口
env.execute();
}
}
到这里还没完,还需要用窗口函数,对分配到窗口内的数据做处理。
窗口函数
定义了窗口分配器,我们只是知道了数据属于哪个窗口,可以将数据收集起来了;至于收集起来到底要做什么,必须再接上一个定义窗口如何进行计算的操作,这就是窗口函数。
增量聚合函数
窗口将数据收集起来,最基本的处理操作就是进行聚合。我们可以每来一个数据就在之前结果上聚合一次,这就是增量聚合。
典型的增量聚合函数有两个:ReduceFunction和AggregateFunction
ReduceFunction:
public class WindowTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 设置生成水位线的时间间隔 env.getConfig().setAutoWatermarkInterval(100); DataStream<Event> stream = env.fromElements( new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice", "./prod?id=100", 3000L), new Event("Bob", "./prod?id=1", 3300L), new Event("Bob", "./home", 4000L), new Event("Bob", "./prod?id=2", 3500L), new Event("Bob", "./prod?id=3", 3800L) ) // 有序流的watermark生成 // .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps() // .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { // @Override // public long extractTimestamp(Event event, long recordTimestamp) { // return event.timestamp; // } // })); //乱序流的watermark生成,要指定延迟时间,处理迟到数据 .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event event, long recordTimestamp) { return event.timestamp; } })); stream.map(new MapFunction<Event, Tuple2<String, Long>>() { @Override public Tuple2 map(Event event) throws Exception { return Tuple2.of(event.user, 1L); } }) .keyBy(data -> data.f0) // .countWindow(10, 2);// 计数窗口, 传1个参数就是滚动窗口,传2个参数就是滑动窗口 // .window(EventTimeSessionWindows.withGap(Time.seconds(2)));//事件时间会话窗口 // .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)));// 滑动时间窗口,1小时,步长5min .window(TumblingEventTimeWindows.of(Time.seconds(10))) // 定义1小时的滚动时间窗口 .reduce(new ReduceFunction<Tuple2<String, Long>>() { @Override public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception { return Tuple2.of(value1.f0, value1.f1 + value2.f1); } }).print(); env.execute(); } }
AggregateFunction
AggregateFunction可以看作是ReduceFunction的通用版本,这里有三种类型:输入类型(IN),累加器类型(ACC)和输出类型(OUT)
接口中有四个方法:
- createAccumulator()
- add()
- getResult()
- merge()
全窗口函数
有些场景下,要做的计算必须基于全部的数据才有效,这时做增量聚合就没什么意义。
第八章 分流合流
分流
public class SplitStreamTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 设置生成水位线的时间间隔
env.getConfig().setAutoWatermarkInterval(100);
DataStream<Event> stream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Bob", "./prod?id=1", 3300L),
new Event("Bob", "./home", 4000L),
new Event("Bob", "./prod?id=2", 3500L),
new Event("Bob", "./prod?id=3", 3800L)
)
// 有序流的watermark生成
// .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
// .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
// @Override
// public long extractTimestamp(Event event, long recordTimestamp) {
// return event.timestamp;
// }
// }));
//乱序流的watermark生成,要指定延迟时间,处理迟到数据
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long recordTimestamp) {
return event.timestamp;
}
}));
// 定义输出标签
OutputTag<Tuple3<String, String, Long>> maryTag = new OutputTag<Tuple3<String, String, Long>>("Mary");
OutputTag<Tuple3<String, String, Long>> bobTag = new OutputTag<Tuple3<String, String, Long>>("Bob");
SingleOutputStreamOperator<Event> process = stream.process(new ProcessFunction<Event, Event>() {
@Override
public void processElement(Event event, ProcessFunction<Event, Object>.Context ctx, Collector<Object> out) throws Exception {
if (event.user.equals("Mary")) {
ctx.output(maryTag, Tuple3.of(event.user, event.url, event.timestamp));
} else if (event.user.equals("Bob")) {
ctx.output(bobTag, Tuple3.of(event.user, event.url, event.timestamp));
} else {
out.collect(event);
}
}
});
env.execute();
}
}