目录

Hello,flink

简介

Flink是一个框架分布式处理引擎,用于对无界和有界数据流进行状态计算

是一个大数据的引擎,有状态的数据计算。

关键是数据流

为什么选择Flink

  • 流数据更真实的反映了我们的生活方式

    真实的情况是,数据是源源不断的,比如订单、弹幕、日志、用户行为、聊天信息等。

    但是如果要是来一条处理一条,对机器的性能要求会比较高,比较好的方法是让它攒一波再处理。

  • 传统的数据架构师基于有限数据集的

    spark streaming可以做这个事情,但是还是有一定的延迟(秒级)

    现在我们要做的是做到毫秒级,并且低延迟、高吞吐,这就是flink要做的。

  • 我们的目标

    • 低延迟

    • 高吞吐

    • 结果的准确性和良好的容错性

      分布式系统在传输的过程中有可能会乱序,导致数据出现一些问题。

      容错性:有一个节点挂了之后,系统要能够会滚到一个比较近的状态

flink的应用

电商和市场营销

​ 数据报表、广告投放、业务流程需要

物联网

​ 传感器实时数据采集和显示、实时报警、交通运输业

电信业

​ 基站流量调配

银行和金融业

​ 实时结算和通知推送,实时检测异常行为

数据特别大的时候计算起来会很慢,用hive,sql语句甚至可能达到几个小时那么多

我们可以变通一下,只要来一个数据,我们就把数据算一下。

数据处理的结构

传统的数据处理架构

所有发过来的数据都是一个事件

优点

  • 实时性很好

缺点

  • 就是并发性太差了

数据越来越多的时候该怎么办?

分析处理

数据显暂时放在业务数据库里面,需要数据的时候先ETL数据清洗,然后再放在数据仓库里面,然后再响应

优点

  • 不用做联表查询

缺点

  • 实时性没有那么好

有状态的流式处理

直接把数据放在本地内存中,用本地内存的状态,代替了表,在高并发的时候可以做集群。

如果想要保持数据持久化,设置一个周期检查点(CheckPoint),定期缓存。

缺点

  • 分布式情况下,因为网络的原因,数据的顺序就有了一些问题

第二代流处理

https://cdn.cjpa.top/cdnimages/image-20210308011228616.png

lambda架构

用两套系统(流处理、批处理),同时保证低延迟和结果准确

流处理保证速度,把处理结果放在结果表中,同时数据也做一个批处理,最后把batch和speed表结合,得到结果。

用户看到的结果是很快的有一个数据的结果(不准确),隔一段时间之后,再获取精确的结果

缺点

  • 需要维护两套系统
  • 两个表算的不一样,实际开发的话会很麻烦

流处理低延迟,但是结果不是很准确,批处理结果稳定,延迟比较少

第三代流处理

Flink

flink的特点

  • 事件驱动

https://cdn.cjpa.top/image-20210317153712907.png

要有一个程序一直运行着,监听数据的到来。

  • 基于流的世界观

    在flink的世界观中,一切都是流组成的,离线数据是有界的流

    实时数据是一个没有界限的流,这就是所谓的有界流和无界流

    https://cdn.cjpa.top/image-20210317154249733.png

  • 分层api

    https://cdn.cjpa.top/image-20210317154319489.png

    • 越往上越抽象,表达含义越简明,使用越方便
    • 越底层越具体,表达能力越丰富,使用越灵活

SQL和Table API还在丰富和发展过程中,阿里的blink做的还挺强。data stream api层用的最多,批处理用的是data set api

如果data stream还不够用的话,可以用processFunction api来处理。

  • 精确一次(exactly-once)的状态一致性保证
  • 低延迟,每秒处理百万个事件,毫秒级延迟
  • 与众多常用的存储系统的连接
  • 高可用,动态沱镇,实现7*24小时全天候运行

流(stream)和微批(micro-batching)

https://cdn.cjpa.top/image-20210317155127194.png

https://cdn.cjpa.top/image-20210317155135753.png

数据模型

  • spark采用RDD模型,spark streaming的DStream实际上也就是一组组小批数据RDD的集合
  • flink基本数据模型是数据流,以及事件(Event)序列

运行时架构

  • spark是批计算,将DAG划分为不同的stage,一个完成后才可以计算下一个
  • flink是标准的流执行模式,一个事件在一个节点处理完后可以直接法网下一个节点进行处理

简单操作

scale版 配置maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-learning-01</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- 主要做编译-->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>4.4.0</version>
                <executions>
                    <execution>
                        <!-- 声明绑定到maven的compile阶段 -->
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- 用来打包的 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
<!--                    后缀,表示把依赖都打进去 -->
                    <descriptorRefs>
                        <descriptorRef>jar-width-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

java版依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-learning-01</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
    </dependencies>
</project>

批处理

// 批处理
public class WordCount {
    public static void main(String[] args) throws Exception{
        // 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 从文件中读取数据
        String inputPath = "/Users/cjp/bilibili-workspace/flink-learning-01/src/main/resources/hello.txt";
        DataSource<String> inputDataSet = env.readTextFile(inputPath);
        // 对数据集进行处理,按照空格分词展开,转换成(word,1)二元组进行统计
        DataSet<Tuple2<String, Integer>> resultSet = inputDataSet.flatMap(new MyflatMapper()).groupBy(0) // 按照第一个位置的word分组
        .sum(1);// 将第二个位置上的数据求和
        resultSet.print();
    }

    // 自定义类,实现flatMapFunction接口
    public static class MyflatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            // 按空格分词
            String[] words = value.split(" ");
            // 遍历所有word,包成二元组输出
            for(String word:words) {
                out.collect(new Tuple2<>(word, 1));
            }
        }
    }
}

流处理

public class StreamWordCount {
    public static void main(String[] args) throws Exception{
        // 创建流处理执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置执行线程
        /*
        前面的标记就是线程数
        2> (javascript,1)
        1> (hello,1)
        2> (vue,1)
        3> (golang,1)
        1> (hello,2)
        1> (hello,3)
        1> (java,1)
        1> (hello,4)
         */
        env.setParallelism(3);


        // 从文件中读取数据
//        String inputPath = "/Users/cjp/bilibili-workspace/flink-learning-01/src/main/resources/hello.txt";
//        DataStream<String> inputDataStream = env.readTextFile(inputPath);

        // 用parameter tool工具从程序启动参数中提取配置项
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String host = parameterTool.get("host");
        int port = parameterTool.getInt("port");
        // 从socket文本流读取数据
        DataStream<String> inputDataStream = env.socketTextStream(host, port);

        // 基于数据流进行转换计算
        SingleOutputStreamOperator resultStream = inputDataStream.flatMap(new WordCount.MyflatMapper())
                // 按照当前key的hash code进行重分区
                .keyBy(0)
                .sum(1);
        resultStream.print();

        // 执行任务
        env.execute();
    }

}

设置启动参数

https://cdn.cjpa.top/image-20210326163835212.png

控制台开启7777端口,模拟业务中不断的发送数据

➜  flink-learning-01 nc -lk 7777
hello
你好,我叫陈家鹏 哈哈
这是一个linux的端口 可以用来发送数据

控制台输出如下

1> (这是一个linux的端�端口,1)
3> (可以用来发送数据,1)

部署

Standalone模式

不依赖于其他的工具,可以自己单独部署

安装

解压缩flink-1.10.1-bin-scala_2.12.tgz(http://archive.apache.org/dist/flink/flink-1.10.1/flink-1.10.1-bin-scala_2.12.tgz),进入到conf目录中

Bin

├── bash-java-utils.jar
├── config.sh
├── find-flink-home.sh
├── flink	# 最重要的命令,如果想要在集群上提交一个作业,停止,取消,都可以用flink命令
├── flink-console.sh
├── flink-daemon.sh
├── historyserver.sh
├── jobmanager.sh	# 管理作业,比较核心
├── kubernetes-entry.sh
├── kubernetes-session.sh
├── mesos-appmaster-job.sh
├── mesos-appmaster.sh
├── mesos-taskmanager.sh
├── pyflink-gateway-server.sh
├── pyflink-shell.sh
├── sql-client.sh
├── standalone-job.sh
├── start-cluster.sh	# 启动集群
├── start-scala-shell.sh
├── start-zookeeper-quorum.sh
├── stop-cluster.sh	# 停止集群
├── stop-zookeeper-quorum.sh
├── taskmanager.sh
├── yarn-session.sh
└── zookeeper.sh

conf

.
├── flink-conf.yaml
├── log4j-cli.properties
├── log4j-console.properties
├── log4j-yarn-session.properties
├── log4j.properties
├── logback-console.xml
├── logback-yarn.xml
├── logback.xml
├── masters
├── slaves
├── sql-client-defaults.yaml
└── zoo.cfg

1)修改flink/conf/flink-conf.yaml文件

jobmanager.heap.size: 1024m	# job manager的堆内存大小(因为jobmanager运行在jvm上面)
taskmanager.memory.process.size: 1728m	# 当前taskmanager整个内存的内存的总大小
# taskmanager既有堆内存也有堆外内存,flink是有状态的流式计算,这些状态是放在内存中的
taskmanager.numberOfTaskSlots: 1# 不同的插槽,不同的slot上可以运行不同的线程
parallelism.default: 1	# 默认并行度,默认用一个线程执行
# numberOfTaskSlots和parallelism的区别,
# numberOfTaskSlots是表示最大的能力,并且是针对一个taskmanager
# parallelism是实际跑多少个,针对总体

2)cat masters

# 提交job的入口,jobmanager地址
localhost:8081

3)cat slaves

# taskmanager地址
localhost

4)运行

切到bin

./start-cluster.sh

https://cdn.cjpa.top/image-20210326202414119.png

available和配置文件中的taskmanager.numberOfTaskSlots: 1一致

https://cdn.cjpa.top/image-20210326202549097.png

命令提交job

1)提交

./bin/flink run -c com.atguigu.ec.StreamWordCount -p 3 /Users/cjp/bilibili-workspace/flink-learning-01/target/flink-learning-01-1.0-SNAPSHOT.jar --host localhost --port 7777
# c 启动类
# p 并行度
# 后面的就是启动参数

2)查看所有job

./bin/flink list

3)取消

./bin/flink cancel job_id

k8s和yarn部署

yarn

yarn有两种模式,分别为Session-Cluster和Per-Job-Cluster两种模式

  • Session-Cluster

https://cdn.cjpa.top/image-20210327195340690.png

Session-Cluster模式需要先启动集群,然后提交作业,接着会想yarn申请一块空尽之后,资源永远保持不变,如果资源满了,下一个作业就无法提交,只能等到yarn中的启动一个作业执行完之后,释放了资源,下一个作业才会正常提交。

所有的作业共享Dispather和ResourceManager共享资源,适合规模小,执行时间短的作业

在yarn中初始化一个flink集群,开辟指定的资源,以后提交任务都会向这里提交,这个flink集群会常驻在yarn集群中,除非手工停止。

  • Per-Job-Cluster

一个job会对应一个集群,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,知道作业执行完成,一个作业的失败与否,并不会影响下一个作业的正常提交和运行,独享Dispatcher和ResourceManager,按需接受资源申请,适合规模大长时间运行的作业

每次提交都会创建一个新的flink集群,任务之间相互独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失