Flink基本篇2

第五章 Flink运行时架构

概述

通常,我们只需要使用getExecutionEnvironment(),因为这将根据上下文执行正确的操作:如果你是在IDE中执行程序或作为常规Java程序执行,它将创建一个本地环境,该环境将在本地计算机上执行您的程序。如果您是从程序创建的JAR文件,并通过命令行调用它,则Flink集群管理器将执行您的main方法,而getExecutionEnvironment()将返回用于在集群上执行程序的执行环境。

Task Slot就是任务槽,就是执行任务所需的最小的一块资源。

flink客户端并不是运行时系统的一部分,只是起了一个提交作业的作用。

对于运行时系统而言,主要就由job manager和task manager组成。

作业管理器

控制一个应用程序执行的主进程,是Flink集群中任务管理和调度的核心。

Job master

是Job manager中最核心的组件,一个job 对应于一个job master

在作业提交时,Job Master会先接收到要执行的应用。一般是由客户端提交来的,包括jar包,数据流图(dataflow graph)和作业图(job graph)。

job master会把jobGraph转换成一个物理层面的数据流图,这个图被称为执行图,它包含了所有可以并发执行的任务,job master会向资源管理器发出请求,申请执行任务必要的资源,一旦获得了足够的资源,就会将执行图分发到真正运行他们的task manager上。

资源管理器

资源管理器主要负责资源的分配和管理,在flink集群中只有一个。所谓资源就是task manager的任务槽,任务槽就是flink集群的资源调配单元,包含了机器用来执行作业的一组CPU和内存资源。每一个任务task都需要分配到一个slot上执行。

任务管理器 task manager

flink的工作进程,通常在flink集群中会有多个task manager运行,每一个task manager都包含了一定数量的slots。slots的数量限制了task manager能够并行处理的任务数量。

作业提交流程

standalone模式

image-20230406165402400

yarn会话模式

task manager跑在容器里

image-20230406170036021

思考

一个流处理程序,到底包含多少个任务?

最终执行任务,需要占用多少slot?

所有flink程序都是由三部分组成的:source、transformation和sink

source负责读取数据源,transformation利用各种算子进行处理加工,sink负责输出。

一些重要概念

并行度

每一个算子(operator)可以包含一个或多个子任务,这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行(在不同的task manager slot中执行)。

一个特定算子的子任务的个数被称之为这个算子的并行度。

一个作业里面的不同算子可以设置不同的并行度。并行度是针对每个特定算子而言的。

env也可以设置并行度,表示全局的所有算子的并行度。

如果在env设置了并行度,在后面的算子处又设置了并行度,那么优先级是算子本身的并行度优先级最高。

优先级:

每个算子的并行度设置 > env环境的并行度设置 > 作业提交的并行度设置

一个流处理程序,可以认为是一个作业,一个作业提交后,不等于一个任务,而是多个子任务,每一个任务会到task manager的一个slot执行。具体是多少子任务,这取决于并行度的设置。

数据传输形式

One-To-One

stream维护着分区以及元素的顺序,比如source和map之间,这意味着map算子的子任务看到的元素的个数以及顺序和source算子的子任务生产的元素的个数、顺序相同。map、filter、flatmap都是One-To-One的关系。

算子链怎么合并:

前后发生的两个算子之间是One-To-One的关系,且算子的并行度相同,那么可以合并成一个算子链,在拓扑图中可以看到。可以节省开销。

image-20230407154008166

执行图

flink中的执行图可以分为四层:

  • StreamGraph
  • JobGraph
  • ExecutionGraph
  • 物理执行图

StreamGraph:是根据用户通过Stream API编写的代码生成的最初的图,用来表示程序的拓扑结构。

JobGraph:StreamGraph经过优化后,生成了JobGraph,提交给JobManager的数据结构,主要的优化为,将多个符合条件的节点chain在一起作为一个节点(合并成算子链),节省开销。

ExecutionGraph:JobManager根据JobGraph生成的图,ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。

任务和任务槽

flink中每一个task manager都是一个JVM进程,为了控制一个task manager能接收多少task,task manager通过task slot来进行控制(一个task manager至少有一个slot)

slot是对资源的一种划分。

slot的隔离主要是针对内存。

配置task manager的slot数量的时候,建议按cpu的核心数量来配置,一个slot分配给一个核,这样CPU就不用分时复用了。

为什么5个任务,用两个slot就跑起来了?

是因为在flink里面,任务之间默认是可以进行slot共享的。

同一个算子的并行子任务,在不同的slot执行;不同算子的任务,有先后顺序的任务,可以共用同一个slot

image-20230407160017643

这样的结果是,一个slot可以保存作业的整个管道。

要配置slot数量和并行度,保证资源的充分利用。

第六章 Datastream API

flink新版本已经实现了流批一体,DataSet API将被弃用,官方推荐统一使用Datastream API来处理流数据和批数据。

Datastream本身是一个flink中用来表示数据集合的类,我们编写的flink代码其实就是基于这种数据类型的处理。我们用一系列API来处理这个集合中的数据,这就叫做数据流的transformations。

一个flink程序,其实就是对Datastream的各种转换,具体来说,代码基本上都由以下几部分构成

  • 获取执行环境
  • 读取数据源
  • 定义基于数据的转换操作
  • 定义计算结果的输出位置
  • 触发程序执行

执行环境

创建执行环境

最简单的方式,就是直接调用getExecutionEnvironment方法,它会根据当前运行的上下文直接得到正确的结果。如果程序是独立运行的,就返回一个本地执行环境;如果是打包成了jar包,然后提交到集群执行,那么就返回集群执行环境。也就是说这个方法会根据当前运行的方式,自行决定该返回什么样的执行环境。

在获取到执行环境后,我们可以对执行环境进行设置,比如可以设置全局并行度,禁用算子链等。

源算子

首要任务就是把数据读进来。

用来读取数据源的算子就叫源算子。source是整个处理程序的输入端。

flink代码中通用的添加source的方式,就是调用执行环境的addSource方法

在实际工作中,我们基本不会碰到很简单的数据类型,不会有很简单的应用需求。需要处理的数据往往是具有多个字段的数据类型。所以我们一般情况会把处理数据包装成元组类型或者说POJO对象,方便后面的处理和转换。

为了更好地理解,先构建一个实际应用场景,比如网站的访问操作,可以抽象成一个三元组(用户名、用户访问的url,用户访问url的时间戳),所以在这里,我们可以创建一个类Event,将用户行为包装成一个对象。这个对象包含上述的三个字段。

准备工作

  1. 准备POJO

    package com.atguigu.wc.pojo.diskData;
    
    import java.sql.Timestamp;
    
    public class Event {
        public String user;
        public String url;
        public Long timestamp;
    
        public Event() {
        }
    
        public Event(String user, String url, Long timestamp) {
            this.user = user;
            this.url = url;
            this.timestamp = timestamp;
        }
    
        public String getUser() {
            return user;
        }
    
        public void setUser(String user) {
            this.user = user;
        }
    
        public String getUrl() {
            return url;
        }
    
        public void setUrl(String url) {
            this.url = url;
        }
    
        public Long getTimestamp() {
            return timestamp;
        }
    
        public void setTimestamp(Long timestamp) {
            this.timestamp = timestamp;
        }
    
        @Override
        public String toString() {
            return "Event{" +
                    "user='" + user + '\'' +
                    ", url='" + url + '\'' +
                    ", timestamp=" + new Timestamp(timestamp) +
                    '}';
        }
    }
    

    定义的POJO,有这样几个特点:

    • 类是公有的
    • 有一个无参的构造方法
    • 所有属性都是公有的
    • 所有属性的类型都是可以序列化的

    flink会把这样的类作为一种特殊的POJO数据类型来对待,方便数据的解析和序列化。另外我们还在类中重写了toString 方法,主要是为了测试输出显示更清晰。

读取数据

根据上面的知识已经知道,一个main启动类里面,source算子可以有多个,也就是数据来源可以有多个。

public class SourceTest {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1. 从文件中读取数据,readTextFile,在实际工作中也很常用,读取日志文件中的数据
        // 当然结合flume + kafka,可以将日志文件中的数据做清洗,生产到kafka中,将kafka作为flink的数据源。
        DataStreamSource<String> stream1 = env.readTextFile("input/clicks.txt");

        //2. 从集合中读取数据
        List<Integer> nums = new ArrayList<>();
        nums.add(2);
        nums.add(5);
        // DataStreamSource泛型里面的类型就是我们当前传入的数据类型
        DataStreamSource<Integer> numStream = env.fromCollection(nums);

        List<Event> events = new ArrayList<>();
        events.add(new Event("Mary", "./home", 1000L));
        events.add(new Event("Bob", "./cart", 2000L));
        DataStreamSource<Event> stream2 = env.fromCollection(events);

        // 3. 从元素读取数据
        DataStreamSource<Event> stream3 = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );

        //4. 从socket文本流读取
        DataStreamSource<String> stream4 = env.socketTextStream("192.168.14.131", 7777);

//        stream1.print("1");
//        numStream.print("nums");
//        stream2.print("2");
        stream4.print("socket");

        env.execute();
    }
}

从kafka读取数据

上面读取的数据都是有界流,从内存列表读,从文件读,都是有界流。从socket读适合吞吐量不大,做测试的时候。

对于真正的流数据,实际项目应该怎样读取呢?

kafka作为分布式消息队列,是一个高吞吐、易于扩展的消息系统。而消息队列的传输方式,恰恰与流处理是一致的,所以说kafka和flink可以说是天生一对。

在如今的实时流处理应用中,由kafka进行数据的收集和传输,flink进行分析计算,这样的架构已成为众多企业的首选。

需要引入kafka连接器

    <!--kafka-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "xxxxxx:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");
        env.addSource(new FlinkKafkaConsumer<String>(
                "clicks",
                new SimpleStringSchema(),
                properties
        ));

自定义source

创建自定义的数据源,实现SourceFunction接口,主要重写两个关键方法:run()和cancel()

  • run():

    使用运行时上下文对象(SourceContext)向下游发送数据

    run方法里面设置一个标志位

  • cancel()

    取消当前作业,将标志位改为false

定义一个随机生成数据的sourceFuntion,注意要指定泛型,也就是数据源的类型

public class ClickSource implements SourceFunction<Event> {
    // 声明一个标志位
    private Boolean running = true;

    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
        // 定义随机生成数据
        Random random = new Random();
        // 定义生成随机数据选取的字段
        String[] users = {"Marry", "Alice", "Bob", "Cary"};
        String[] urls = {"./home", "./cart", "./fav", "./prod?id=10"};

        // 循环生成数据
        while (running) {
            String user = users[random.nextInt(users.length)];
            String url = urls[random.nextInt(urls.length)];
            Long timestamp = Calendar.getInstance().getTimeInMillis();
            ctx.collect(new Event(user, url, timestamp));
            Thread.sleep(1000L);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

//main
public class SourceCustomTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 自定义source function
        DataStreamSource<Event> customDataStream = env.addSource(new ClickSource());
        customDataStream.print();
        env.execute();
    }
}

这种用户自定义数据源的方法,可以很好地用于测试。

如果自定义的sourceFunction不是并行sourceFunction,则不能设置source算子的并行度,对于非并行sourceFunction的并行度,只能设置为1.

自定义并行source

    // 实现自定义的并行sourceFunction
    public static class ParallelCustomSource implements ParallelSourceFunction<Integer> {
        private Boolean running = true;
        private Random random = new Random();
        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            while (running) {
                ctx.collect(random.nextInt());
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }

对于这个自定义的并行数据源就可以给source设置并行度了。

flink支持的数据类型

flink支持的数据类型有Java所有的基本类型及其包装类,再加上void,String,Date,BigDecimal和BigInteger

支持数组类型,包括基本类型数组和对象数组

复合数据类型

  • Java元组类型(Tuple0-Tuple25)
  • Scala样例类及Scala元组
  • 行类型,可以认为是具有任意个字段的元组
  • POJO

flink对POJO类型要求如下:

  • 类是public的且独立的,没有非静态的内部类。
  • 类有一个公共的无参构造方法
  • 类中所有字段是public且非final的,可以直接访问到;或者有公告的getter和setter方法。

flink还支持List、Map等辅助类型。

在这些类型中,元组类型和POJO类型最灵活,因为他们支持创建复杂类型,而相比之下,POJO还支持在键的定义中直接使用字段名,所以在实际项目中,往往会将流处理程序中的元素类型定义为Flink中的POJO类型。

转换算子(Transformation)

image-20230410105353234

数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream,一个Flink程序的核心,其实就是所有的转换操作,他们决定了处理的业务逻辑。

可以针对一条流进行转换处理,也可以进行分流、合流等多流转换操作。

map

public class TransformMapTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );
        SingleOutputStreamOperator<String> result = stream.map(new MyMapper());
        result.print();
        env.execute();
    }

    // 自定义map function
    public static class MyMapper implements MapFunction<Event, String> {
        @Override
        public String map(Event event) throws Exception {
            return event.user;
        }
    }
}

filter

顾名思义就是对数据流做一个过滤操作,通过一个布尔条件表达式设置一个过滤条件,对每一个流内元素进行判断。

filter转换需要传入的参数需要实现FilterFunction接口。

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );
        SingleOutputStreamOperator<Event> result = stream.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event event) throws Exception {
                if (Objects.equals(event.user, "Mary")) {
                    return true;
                }
                return false;
            }
        });
        result.print();
        env.execute();
    }

flatMap

可以看作前面两步操作的结合。

主要是将数据流中的整体(一般是集合类型)拆分成一个个的个体使用,可以产生0到多个元素。

flatMap也可以使用Lambda表达式或者FlatMapFunction接口实现类的方式来进行传参,返回值类型可以与原数据流相同,也可以不同。

FlatMapFunction泛型两个参数分别是输入数据的数据类型和输出数据的数据类型

public class TransformFlatMapTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );
        stream.flatMap(new MyFlatMap()).print();
        env.execute();
    }

    public static class MyFlatMap implements FlatMapFunction<Event, String> {

        @Override
        public void flatMap(Event event, Collector<String> out) throws Exception {
            out.collect(event.user);
            out.collect(event.url);
            out.collect(event.timestamp.toString());
        }
    }
}

聚合算子

聚合操作,计算的结果不仅依赖于当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并,这就是aggregation,也对应着MapReduce中的Reduce。

  1. 按键分区-keyBy

    对于Flink而言,DataStream是没有直接进行聚合的API的,因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。

    所以在Flink中要做聚合,先要进行分区,这个操作就是通过keyBy来完成的。

    这里所说的分区是逻辑上的分区。基于key分区的数据有可能被划分到物理上不同的分区,即不同task manager的不同slot。

    flink的keyBy分区,是基于key的哈希值。所以这里如果key是POJO的话,那么必须重写POJO的hashCode()方法。

    image-20230410114233386

    有很多不同的方法来指定key,比如对于Tuple数据类型,可以指定字段的位置或者多个位置的组合;对于POJO类型,可以指定字段的名称;另外,还可以传入Lambda表达式,或者实现一个键选择器KeySelector,用于说明从数据提取key的逻辑。

  2. 简单聚合

    public class TransformSimpleAggTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataStreamSource<Event> stream = env.fromElements(
                    new Event("Mary", "./home", 1000L),
                    new Event("Bob", "./cart", 2000L),
                    new Event("Alice", "./prod?id=100", 3000L),
                    new Event("Bob", "./prod?id=1", 3300L),
                    new Event("Bob", "./home", 4000L),
                    new Event("Bob", "./prod?id=2", 3500L),
                    new Event("Bob", "./prod?id=3", 3800L)
            );
            //按键分组之后进行聚合, 提取用户最近一次访问数据
            // 我们要选择user作为key,那么key类型就是String
            SingleOutputStreamOperator<Event> result = stream.keyBy(new KeySelector<Event, String>() {
    
                @Override
                public String getKey(Event event) throws Exception {
                    return event.user;
                }
            }).max("timestamp");
    
            SingleOutputStreamOperator<Event> result1 = stream.keyBy(data -> data.user).maxBy("timestamp");
            result.print("max:");
            result1.print("maxBy:");
            env.execute();
        }
    }
    

    max只针对指定的字段,maxBy则会输出整条数据,也包括其他字段。

    简单聚合可以分为两步:1.keyBy;2.聚合

  3. 归约聚合

    T reduce(T var1, T var2) throws Exception;
    

    var1表示已经归约好的结果,这就是有状态的处理。

    public class TransformReduceTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataStreamSource<Event> stream = env.fromElements(
                    new Event("Mary", "./home", 1000L),
                    new Event("Bob", "./cart", 2000L),
                    new Event("Alice", "./prod?id=100", 3000L),
                    new Event("Bob", "./prod?id=1", 3300L),
                    new Event("Bob", "./home", 4000L),
                    new Event("Bob", "./prod?id=2", 3500L),
                    new Event("Bob", "./prod?id=3", 3800L)
            );
            // 统计最活跃的用户是谁
            // 1. 统计每个用户的访问频次
            SingleOutputStreamOperator<Tuple2<String, Long>> clicksByUser = stream.map(new MapFunction<Event, Tuple2<String, Long>>() {
                        @Override
                        public Tuple2<String, Long> map(Event event) throws Exception {
                            return Tuple2.of(event.user, 1L);
                        }
                    }).keyBy(data -> data.f0)
                    .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                        @Override
                        public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                            // 把两个要归约的值叠加起来,那么就是把value1和value2的第二个字段叠加起来
                            return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                        }
                    });
            //2. 根据统计的访问频次,选择最活跃的用户
            SingleOutputStreamOperator<Tuple2<String, Long>> result = clicksByUser.keyBy(data -> "key")
                    .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                        @Override
                        public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                            if (value1.f1 > value2.f1) {
                                return value1;
                            }
                            return value2;
                        }
                    });
            result.print();
            env.execute();
        }
    }
    

用户自定义函数

富函数类

所有的Flink函数类都有其Rich版本,富函数类一般是以抽象类的形式出现的。与常规函数类的不同主要在于,富函数类可以获取运行函数的上下文,并拥有一些生命周期方法。

生命周期的概念在编程中其实非常重要。比如对象的生命周期,涉及垃圾回收、内存管理等。

RichFunction有生命周期的概念,典型的生命周期方法有:

  • open()

    当前任务实例被创建的时候首先会调用的方法。

    是RichFunction的初始化方法,也就是会开启一个算子的生命周期。**当一个算子的实际工作方法例如map()或者filter()被调用之前,open()会被首先调用。**所以像文件IO的创建,数据库连接的创建,配置文件读取等这样的操作,适合在open()方法中完成。

  • close()

    生命周期的最后一个方法,用来做一些清理和资源释放工作。

继承富函数类,除了实际工作方法需要重写之外,也要写生命周期方法。

生命周期方法只会被调用一次,实际工作方法,每次数据到来都会被调用。

    public static class MyRichMapper extends RichMapFunction<Event, Integer> {
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            System.out.println("open生命周期被调用,第" + getRuntimeContext().getIndexOfThisSubtask() + "号任务启动");
        }

        @Override
        public Integer map(Event event) throws Exception {
            return event.url.Length();
        }

        @Override
        public void close() throws Exception {
            super.close();
            System.out.println("close生命周期被调用,第" + getRuntimeContext().getIndexOfThisSubtask() + "号任务close");
        }
    }

ChatGpt keyBy分流再合流的用法

flink数据流中的数据,类型是json字符串,有两个字段,一个是type,一个是data,我想根据type的类型分别对数据流的data进行不同的处理,处理完之后再合流,应该怎么写?

参考:

您可以使用Flink的keyBy和process函数来实现您的需求。首先,您需要使用keyBy函数将数据流分区为具有相同类型的记录。然后,您可以使用process函数来处理每个记录并将其发送到下一个算子以进行合并。以下是一个示例代码:

DataStream<String> jsonDataStream = ... // your JSON data stream

// keyBy type field
KeyedStream<Tuple2<String, String>, String> keyedStream = jsonDataStream
    .map(jsonStr -> {
        // parse JSON string into Tuple2<String, String>
        JSONObject jsonObj = new JSONObject(jsonStr);
        String type = jsonObj.getString("type");
        String data = jsonObj.getString("data");
        return Tuple2.of(type, data);
    })
    .keyBy(tuple -> tuple.f0);

// process data based on type
DataStream<String> processedDataStream = keyedStream
    .process(new ProcessFunction<Tuple2<String, String>, String>() {
        @Override
        public void processElement(Tuple2<String, String> tuple, Context context, Collector<String> collector) throws Exception {
            String type = tuple.f0;
            String data = tuple.f1;

            // process data based on type
            if (type.equals("type1")) {
                // process data for type1
                String processedData = processDataForType1(data);
                collector.collect(processedData);
            } else if (type.equals("type2")) {
                // process data for type2
                String processedData = processDataForType2(data);
                collector.collect(processedData);
            } else {
                // unknown type
                System.out.println("Unknown type: " + type);
            }
        }
    });

// define your own processDataForType1 and processDataForType2 functions
private String processDataForType1(String data) {
    // process data for type1
    return data.toUpperCase();
}

private String processDataForType2(String data) {
    // process data for type2
    return data.toLowerCase();
}
在上面的代码中,我们首先将JSON字符串转换为具有类型和数据字段的元组。然后,我们使用keyBy函数将数据流分区为具有相同类型的记录。最后,我们使用process函数来处理每个记录并将其发送到下一个算子以进行合并。在process函数中,我们根据类型字段处理数据,并将处理后的数据发送到下一个算子以进行合并。

flink数据流中的数据,类型是json字符串,有两个字段,一个是type,一个是data,我想根据type的类型分别对数据流的data进行不同的处理,这里的处理包括对data的某些字段按事件时间进行聚合操作,处理完之后再合流,应该怎么写?

如果您需要根据JSON数据的type字段对数据流中的数据进行不同的处理,并按事件时间对数据进行聚合操作,您可以使用Flink的keyBy函数将数据流分区为具有相同类型的记录,并使用window函数和reduce函数对每个窗口中的数据进行聚合操作。接下来,您可以使用union函数将处理后的数据流合并为一个数据流。以下是一个示例代码:

Copy
// POJO class for JSON data
public class Data {
    public String type;
    public String data;

    public Data() {}

    public Data(String type, String data) {
        this.type = type;
        this.data = data;
    }
}

DataStream<String> jsonDataStream = ... // your JSON data stream

// map JSON string to Data object
DataStream<Data> dataStream = jsonDataStream
    .map(jsonStr -> {
        // parse JSON string into Data object
        JSONObject jsonObj = new JSONObject(jsonStr);
        String type = jsonObj.getString("type");
        String data = jsonObj.getString("data");
        return new Data(type, data);
    });

// keyBy type field
KeyedStream<Data, String> keyedStream = dataStream
    .keyBy(data -> data.type);

// window and reduce
DataStream<Data> processedDataStream = keyedStream
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .reduce(new ReduceFunction<Data>() {
        @Override
        public Data reduce(Data data1, Data data2) throws Exception {
            // parse JSON string into JSONObject
            JSONObject jsonObj1 = new JSONObject(data1.data);
            JSONObject jsonObj2 = new JSONObject(data2.data);
            // aggregate fields based on event time
            double field1 = Math.max(jsonObj1.getDouble("field1"), jsonObj2.getDouble("field1"));
            int field2 = jsonObj1.getInt("field2") + jsonObj2.getInt("field2");
            // add more fields as needed
            // create new JSON object
            JSONObject newJsonObj = new JSONObject();
            newJsonObj.put("field1", field1);
            newJsonObj.put("field2", field2);
            // add more fields as needed
            // create new Data object
            Data newData = new Data(data1.type, newJsonObj.toString());
            return newData;
        }
    });

// union processed data streams
DataStream<Data> unionedDataStream = processedDataStream1
    .union(processedDataStream2)
    .union(processedDataStream3)
    // add more data streams as needed
    .keyBy(data -> data.type)
    .process(new ProcessFunction<Data, Data>() {
        @Override
        public void processElement(Data data, Context context, Collector<Data> collector) throws Exception {
            collector.collect(data);
        }
    });
在上面的代码中,我们首先将JSON字符串转换为Data对象。然后,我们使用keyBy函数将数据流分区为具有相同类型的记录。接下来,我们使用window函数和reduce函数对每个窗口中的数据进行聚合操作。在reduce函数中,我们首先将JSON字符串转换为JSONObject,然后根据事件时间进行聚合操作,并创建新的JSON对象和Data对象。最后,我们使用union函数将处理后的数据流合并为一个数据流,并使用process函数将数据发送到下一个算子以进行处理。

上面的代码,不同type的data聚合时间不同,有些data需要1小时聚合一次,有些data需要4小时聚合一次,能否写详细一些

如果不同type的data需要根据不同的时间窗口进行聚合操作,您可以使用Flink的assignTimestampsAndWatermarks函数为每个记录分配事件时间,并使用keyBy函数将数据流分区为具有相同类型的记录。接下来,您可以使用window函数和reduce函数对每个窗口中的数据进行聚合操作。在window函数中,您可以使用EventTimeSessionWindowsEventTimeProcessingTimeWindows等窗口类型,并根据数据的类型和事件时间选择不同的时间窗口长度。以下是示例代码:

Copy
// POJO class for JSON data
public class Data {
    public String type;
    public String data;

    public Data() {}

    public Data(String type, String data) {
        this.type = type;
        this.data = data;
    }
}

DataStream<String> jsonDataStream = ... // your JSON data stream

// map JSON string to Data object and assign event time
DataStream<Data> dataStream = jsonDataStream
    .map(jsonStr -> {
        // parse JSON string into Data object
        JSONObject jsonObj = new JSONObject(jsonStr);
        String type = jsonObj.getString("type");
        String data = jsonObj.getString("data");
        // assign event time based on data type
        long eventTime = 0;
        if (type.equals("type1")) {
            eventTime = jsonObj.getLong("timestamp");
        } else if (type.equals("type2")) {
            eventTime = jsonObj.getLong("timestamp") + 4 * 60 * 60 * 1000; // 4 hours later
        } else {
            // add more types as needed
        }
        return new Data(type, data);
    })
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Data>(Time.minutes(1)) {
        @Override
        public long extractTimestamp(Data data) {
            // extract event time from data
            JSONObject jsonObj = new JSONObject(data.data);
            long eventTime = 0;
            if (data.type.equals("type1")) {
                eventTime = jsonObj.getLong("timestamp");
            } else if (data.type.equals("type2")) {
                eventTime = jsonObj.getLong("timestamp") + 4 * 60 * 60 * 1000; // 4 hours later
            } else {
                // add more types as needed
            }
            return eventTime;
        }
    });

// keyBy type field
KeyedStream<Data, String> keyedStream = dataStream
    .keyBy(data -> data.type);

// window and reduce
DataStream<Data> processedDataStream = keyedStream
    .window(EventTimeSessionWindows.withGap(Time.hours(1)))
    .reduce(new ReduceFunction<Data>() {
        @Override
        public Data reduce(Data data1, Data data2) throws Exception {
            // parse JSON string into JSONObject
            JSONObject jsonObj1 = new JSONObject(data1.data);
            JSONObject jsonObj2 = new JSONObject(data2.data);
            // aggregate fields based on event time
            double field1 = Math.max(jsonObj1.getDouble("field1"), jsonObj2.getDouble("field1"));
            int field2 = jsonObj1.getInt("field2") + jsonObj2.getInt("field2");
            // add more fields as needed
            // create new JSON object
            JSONObject newJsonObj = new JSONObject();
            newJsonObj.put("field1", field1);
            newJsonObj.put("field2", field2);
            // add more fields as needed
            // create new Data object
            Data newData = new Data(data1.type, newJsonObj.toString());
            return newData;
        }
    });

// union processed data streams
DataStream<Data> unionedDataStream = processedDataStream1
    .union(processedDataStream2)
    .union(processedDataStream3)
    // add more data streams as needed
    .keyBy(data -> data.type)
    .process(new ProcessFunction<Data, Data>() {
        @Override
        public void processElement(Data data, Context context, Collector<Data> collector) throws Exception {
            collector.collect(data);
        }
    });
在上面的代码中,我们为每个记录分配了事件时间,并使用keyBy函数将数据流分区为具有相同类型的记录。接下来,我们使用window函数和reduce函数对每个窗口中的数据进行聚合操作。在window函数中,我们使用EventTimeSessionWindows窗口类型,并根据数据的类型和事件时间选择不同的时间窗口长度。例如,对于type1

物理分区

keyBy是逻辑分区,至于分得均不均匀,每个key的数据要到哪一区去,这些是完全无从控制的。

因为按key划分,可能分出很多key,远远超过slot数

物理分区可以控制分区策略,精准调配数据.

例如我们编写的程序,可能对多个处理任务或者说算子设置了不同的并行度,那么当数据执行的上下游任务并行度变化时,数据就不应该在当前分区以直通方式传输了,因为如果并行度变小,当前分区可能没有下游任务了;而如果并行度变大,所有数据还在原先的分区处理就会导致资源的浪费。所以这种情况下,系统会自动地将数据均匀地发往下游所有并行任务,保证各个分区负载均衡。

// 随机分区
stream.shuffle().print().setParallelism(4);
// 轮询分区
stream.rebalance().print().setParallelism(4);

输出算子

flink作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。

建立连接,释放连接,在继承自RichFunction的类里就可以实现这些。

但是一般不这样做。flink把向外部系统写入这步操作抽象提炼成了sink。

dataStream的print()方法,内部也是实现了addSink()

    @PublicEvolving
    public DataStreamSink<T> print() {
        PrintSinkFunction<T> printFunction = new PrintSinkFunction();
        return this.addSink(printFunction).name("Print to Std. Out");
    }

PrintSinkFunction继承自RichSinkFunction

SinkFunction多数情况下并不需要我们自己实现。

像kafka之类流式系统,flink提供了完美对接,source、sink两端都能连,可读可写,而对于es、文件系统、jdbc等数据存储系统,则只提供了输出写入的sinkFunction

输出到文件

public class SinkToFileTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Bob", "./prod?id=1", 3300L),
                new Event("Bob", "./home", 4000L),
                new Event("Bob", "./prod?id=2", 3500L),
                new Event("Bob", "./prod?id=3", 3800L)
        );
        StreamingFileSink<String> fileSink =
                StreamingFileSink.<String>forRowFormat(new Path("./output"),
                        //指定滚动策略
                        new SimpleStringEncoder<>("UTF-8"))
                        .withRollingPolicy(
                                DefaultRollingPolicy.builder()
                                        //指定文件大小,达到多少就归档保存
                                        .withMaxPartSize(1024 * 1024 * 1024)
                                        //定义隔多长时间滚动一次,开启一个新的文件
                                        .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                                        // 指定多久没有数据到来,文件就归档保存
                                        .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                        .build()
                        )
                        .build();
        stream.map(Event::toString).addSink(fileSink);
    }
}

输出到kafka

kafka是消息队列,处理的也是流式数据,和flink天生一对。

flink与kafka的连接器提供了端到端的精确一次的语义保证,这在实际项目中是最高级别的一致性保证。

public class SinkToKafkaTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //kafka配置
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "xxxxx:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

        //从Kafka获取数据流
        DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<String>(
                "clicks",
                new SimpleStringSchema(),
                properties
        ));

        // 读到的是String,要转换成event
        SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                return new Event(fields[0], fields[1], Long.valueOf(fields[2].trim())).toString();
            }
        });

        // 结果数据写入kafka
        result.addSink(new FlinkKafkaProducer<String>(
                "xxxxx:9092", "events", new SimpleStringSchema()
        ));


        env.execute();
    }
}

第七章 时间和窗口

在流处理应用中,很重要很常见的操作就是窗口计算。

所谓的窗口一般就是划定的一段时间范围,对这范围内的数据进行处理,就是所谓的窗口计算。所以窗口和时间往往是分不开的。

时间语义

image-20230411104546144

事件产生的时间,进入Flink的时间,以及被某个算子处理的时间完全是不一样的。

事件发生之后,生成的数据被收集起来,首先进入分布式消息队列,然后被Flink系统的source算子读取消费,进而向下游的窗口算子传递,最后进行处理计算。

这里有两个非常重要的时间点,一个是数据产生的时间,一个是数据进入窗口真正被处理的时间,这两个时间不一样,我们定义的窗口操作到底以哪种时间作为衡量标准,就是所谓的时间语义。

在计算机系统中,考虑数据处理的时间是没什么意义的,我们更关心的,显然是数据本身产生的时间。所以在实际应用中,事件时间语义会更常见,也就是事件产生时间而不是事件处理时间。一般情况下,业务日志数据处理中都会记录数据生成的时间戳,它就可以作为事件时间判断的基础。

事件时间,也就是数据生成的时间,数据一旦产生,这个时间就确定了,所以它可以作为一个属性嵌入到数据中,这其实就是这条数据的时间戳。

在事件时间语义下,我们对于时间的衡量,就不看任何机器的系统时间了,而是依赖于数据本身。由于分布式系统中网络传输延迟的不确定性,实际应用中我们要面对的数据流往往是乱序的。在这种情况下,就不能简单地把数据自带的时间戳当作时钟了,而需要用另外的标志来表示事件时间进展,在Flink中把它叫做事件时间的水位线(watermarks)

水位线

image-20230411111028715

在窗口的处理过程中,我们可以基于数据的时间戳,自定义一个逻辑时钟,这个时钟的时间不会自动流逝,它的时间进展,就是靠着新到数据的时间戳来推动的。

这样的好处在于,计算的过程可以完全不依赖于处理时间(系统时间)。

在Flink中,用来衡量事件时间进展的标记,就被称作水位线。

水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就是在某个数据到来之后。

image-20230411112932546

  1. 有序流中的水位线

    在理想状态下,数据应该按照他们的先后顺序进入流中,这样的话我们从每个数据中提取时间戳,就可以保证总是从小到大增长的,从而插入的水位线也会不断增长,事件时钟不断向前推进。

    实际应用中,如果当前数据量非常大,可能会有很多数据的时间戳是相同的,这时每来一条数据就提取时间戳,插入水位线就做了大量的无用功,而且即使时间戳不同,同时涌来的数据时间差会非常小,往往对处理计算也没什么影响。所以为了提高效率,一般每隔一段时间就会生成一个水位线,这个水位线的时间戳就是当前最新数据的时间戳,所以这时的水位线,其实就是有序流的一个周期性出现的时间标记。

    image-20230411113818749

  2. 乱序流中的水位线

    在分布式系统中,数据在节点中传输,会因为网络传输延迟的不确定性,导致顺序发生改变,这就是所谓的乱序数据。

    一个7秒时产生的数据,生成时间自然要比9秒的数据早,但是经过数据缓存和传输之后,处理任务可能先收到了9秒的数据,之后才是7秒的数据,这就是乱序。

    解决思路:

    还是靠数据来驱动,不依靠真正的系统时间,每来一个时间就提取它的时间戳,插入一个水位线。不过现在的情况是数据乱序,所以插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线。也就是说,只有数据的时间戳比当前时间戳大,才能推动时间推进,才插入水位线。

    image-20230411115327839

水位线watermark(t),表示在当前流中,事件时间已经达到了时间戳t,这代表t之前的所有数据都到齐了,之后流中不会出现小于时间戳t的数据。

设置水位线的方法

assignTimestampsAndWatermarks()里面要传一个参数,就是watermark的生成策略。

水位线生产的最佳位置是在尽可能靠近数据源的地方,因为水位线生成时会做出一些有关元素顺序相对时间戳的假设。由于数据源读取过程是并行的,一切引起Flink跨行数据流分区进行重新分发的操作(比如:改变并行度,keyby等)都会导致元素时间戳乱序。但是如果是某些初始化的filter、map等不会引起元素重新分发的操作,所以是可以考虑在生成水位线之前使用。

public class WatermarkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 设置生成水位线的时间间隔
        env.getConfig().setAutoWatermarkInterval(100);
        DataStream<Event> stream = env.fromElements(
                        new Event("Mary", "./home", 1000L),
                        new Event("Bob", "./cart", 2000L),
                        new Event("Alice", "./prod?id=100", 3000L),
                        new Event("Bob", "./prod?id=1", 3300L),
                        new Event("Bob", "./home", 4000L),
                        new Event("Bob", "./prod?id=2", 3500L),
                        new Event("Bob", "./prod?id=3", 3800L)
                )
                // 有序流的watermark生成
//                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
//                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
//                    @Override
//                    public long extractTimestamp(Event event, long recordTimestamp) {
//                        return event.timestamp;
//                    }
//                }));
                //乱序流的watermark生成,要指定延迟时间,处理迟到数据
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long recordTimestamp) {
                                return event.timestamp;
                            }
                        }));

        env.execute();
    }
}

水位线的生成,一定要有水位线的生成器和时间戳的提取器。

窗口

概念

在大数据流式计算中,基于时间最常见的操作就是窗口,最典型的利用时间来控制输出的就是窗口。

我们很容易把窗口想象成一个固定位置的“框”,数据源源不断流过来,到某个时间点窗口该关闭了,就停止收集数据、触发计算并输出结果。

分类

在flink中,窗口的应用非常灵活,我们可以使用各种不同类型的窗口来实现需求。

按照驱动类型分类

窗口本身是截取有界数据的一种方式,以什么标准来开始和结束数据的截取。

按照时间段去截取数据的窗口就叫做时间窗口,这在实际应用中最常见,除了由时间驱动之外,窗口其实也可以由数据驱动,也就是说按照固定的个数,来截取一段数据集,这叫计数窗口。

时间窗口

  • 滚动窗口
  • 滑动窗口
  • 会话窗口

计数窗口

全局窗口

这种窗口全局有效,会把相同key的所有数据都分配到同一个窗口中,这种窗口没有结束的时候,默认是不会触发计算的,如果希望它能对数据进行计算处理,还需要自定义触发器trigger

窗口API

  1. 按键分区和非按键分区

    在定义窗口操作之前,首先需要确定,是基于按键分区的数据流KeyedStream来开窗,还是直接在没有分区的DataStream上开窗,也就是说,在调用窗口算子之前,是否有keyBy操作。

    按键分区窗口

    经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流,这就是keyedStream,基于keyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务。所以可以认为,每个key上都定义了一组窗口,各自独立地进行统计计算。

    非按键分区窗口

    如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流,这时窗口逻辑只能在一个任务上执行,这就相当于并行度为1.所以在实际应用中一般不推荐使用这种方式。

    这里需要注意的是,对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作。

窗口分配器

定义几种窗口的方法

public class WindowTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 设置生成水位线的时间间隔
        env.getConfig().setAutoWatermarkInterval(100);
        DataStream<Event> stream = env.fromElements(
                        new Event("Mary", "./home", 1000L),
                        new Event("Bob", "./cart", 2000L),
                        new Event("Alice", "./prod?id=100", 3000L),
                        new Event("Bob", "./prod?id=1", 3300L),
                        new Event("Bob", "./home", 4000L),
                        new Event("Bob", "./prod?id=2", 3500L),
                        new Event("Bob", "./prod?id=3", 3800L)
                )
                // 有序流的watermark生成
//                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
//                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
//                    @Override
//                    public long extractTimestamp(Event event, long recordTimestamp) {
//                        return event.timestamp;
//                    }
//                }));
                //乱序流的watermark生成,要指定延迟时间,处理迟到数据
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long recordTimestamp) {
                                return event.timestamp;
                            }
                        }));
        stream.keyBy(data -> data.user)
                .countWindow(10, 2);// 计数窗口, 传1个参数就是滚动窗口,传2个参数就是滑动窗口
//                .window(EventTimeSessionWindows.withGap(Time.seconds(2)));//事件时间会话窗口
//                .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)));// 滑动时间窗口,1小时,步长5min
//                .window(TumblingEventTimeWindows.of(Time.hours(1))); // 定义1小时的滚动时间窗口
        env.execute();
    }
}

到这里还没完,还需要用窗口函数,对分配到窗口内的数据做处理。

窗口函数

定义了窗口分配器,我们只是知道了数据属于哪个窗口,可以将数据收集起来了;至于收集起来到底要做什么,必须再接上一个定义窗口如何进行计算的操作,这就是窗口函数。

image-20230412140212223

  1. 增量聚合函数

    窗口将数据收集起来,最基本的处理操作就是进行聚合。我们可以每来一个数据就在之前结果上聚合一次,这就是增量聚合。

    典型的增量聚合函数有两个:ReduceFunction和AggregateFunction

    ReduceFunction:

    public class WindowTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            // 设置生成水位线的时间间隔
            env.getConfig().setAutoWatermarkInterval(100);
            DataStream<Event> stream = env.fromElements(
                            new Event("Mary", "./home", 1000L),
                            new Event("Bob", "./cart", 2000L),
                            new Event("Alice", "./prod?id=100", 3000L),
                            new Event("Bob", "./prod?id=1", 3300L),
                            new Event("Bob", "./home", 4000L),
                            new Event("Bob", "./prod?id=2", 3500L),
                            new Event("Bob", "./prod?id=3", 3800L)
                    )
                    // 有序流的watermark生成
    //                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
    //                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
    //                    @Override
    //                    public long extractTimestamp(Event event, long recordTimestamp) {
    //                        return event.timestamp;
    //                    }
    //                }));
                    //乱序流的watermark生成,要指定延迟时间,处理迟到数据
                    .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                            .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                @Override
                                public long extractTimestamp(Event event, long recordTimestamp) {
                                    return event.timestamp;
                                }
                            }));
            stream.map(new MapFunction<Event, Tuple2<String, Long>>() {
                        @Override
                        public Tuple2 map(Event event) throws Exception {
                            return Tuple2.of(event.user, 1L);
                        }
                    })
                    .keyBy(data -> data.f0)
    //                .countWindow(10, 2);// 计数窗口, 传1个参数就是滚动窗口,传2个参数就是滑动窗口
    //                .window(EventTimeSessionWindows.withGap(Time.seconds(2)));//事件时间会话窗口
    //                .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)));// 滑动时间窗口,1小时,步长5min
                    .window(TumblingEventTimeWindows.of(Time.seconds(10))) // 定义1小时的滚动时间窗口
                    .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                        @Override
                        public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                            return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                        }
                    }).print();
            
            env.execute();
        }
    }
    

    AggregateFunction

    AggregateFunction可以看作是ReduceFunction的通用版本,这里有三种类型:输入类型(IN),累加器类型(ACC)和输出类型(OUT)

    接口中有四个方法:

    • createAccumulator()
    • add()
    • getResult()
    • merge()
  2. 全窗口函数

    有些场景下,要做的计算必须基于全部的数据才有效,这时做增量聚合就没什么意义。

第八章 分流合流

分流

public class SplitStreamTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 设置生成水位线的时间间隔
        env.getConfig().setAutoWatermarkInterval(100);
        DataStream<Event> stream = env.fromElements(
                        new Event("Mary", "./home", 1000L),
                        new Event("Bob", "./cart", 2000L),
                        new Event("Alice", "./prod?id=100", 3000L),
                        new Event("Bob", "./prod?id=1", 3300L),
                        new Event("Bob", "./home", 4000L),
                        new Event("Bob", "./prod?id=2", 3500L),
                        new Event("Bob", "./prod?id=3", 3800L)
                )
                // 有序流的watermark生成
//                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
//                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
//                    @Override
//                    public long extractTimestamp(Event event, long recordTimestamp) {
//                        return event.timestamp;
//                    }
//                }));
                //乱序流的watermark生成,要指定延迟时间,处理迟到数据
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long recordTimestamp) {
                                return event.timestamp;
                            }
                        }));
        // 定义输出标签
        OutputTag<Tuple3<String, String, Long>> maryTag = new OutputTag<Tuple3<String, String, Long>>("Mary");
        OutputTag<Tuple3<String, String, Long>> bobTag = new OutputTag<Tuple3<String, String, Long>>("Bob");
        SingleOutputStreamOperator<Event> process = stream.process(new ProcessFunction<Event, Event>() {
            @Override
            public void processElement(Event event, ProcessFunction<Event, Object>.Context ctx, Collector<Object> out) throws Exception {
                if (event.user.equals("Mary")) {
                    ctx.output(maryTag, Tuple3.of(event.user, event.url, event.timestamp));
                } else if (event.user.equals("Bob")) {
                    ctx.output(bobTag, Tuple3.of(event.user, event.url, event.timestamp));
                } else {
                    out.collect(event);
                }
            }
        });

        env.execute();
    }
}
Last Updated:
Contributors: 陈杨