Flink:本地执行(Local Execution) – 过往记忆

文章目录

  Flink可以在单台机器上运行,甚至是单个Java虚拟机(Java Virtual Machine)。这种机制使得用户可以在本地测试或者调试Flink程序。本节主要概述Flink本地模式的运行机制。

  本地环境和执行器(executors)运行你在本地的Java虚拟机上运行Flink程序,或者是在属于正在运行程序的如何Java虚拟机上。对于大部分示例程序而言,你只需简单的地点击你IDE上的运行(Run)按钮就可以执行。

  Flink支持两种不同的本地执行模式:(1)、LocalExecutionEnvironment 启动了Flink完整的运行环境,包括一个JobManager和一个TaskManager。运行环境中包括了内存管理,并且所有的内部算法再试在集群模式下运行的。(2)、CollectionEnvironment在Java集合上运行Flink程序。这种模式下不会启动Flink完整的执行环境,所以这种模式负载非常低,而且非常轻量级。例如 DataSet.map()-转换操作,执行器将是对Java list里的所有元素进行map()函数的操作。

调试

  如果你本地运行Flink程序,你可以像普通的Java程序一样调试Flink程序。 你既可以使用System.out.println()打印一些内部变量,或者使用调试工具。 你还可以在map(), reduce() 或者其他方法中打断点。更多的详情请参见Java API文档中的调试章节,以便了解如何使用Java API工具进行测试或者本地调试等。

Maven依赖

  如果你在Maven工程中开发你的程序,你需要将flink-clients模块加入到你项目的依赖中:

<dependency>
  <groupid>org.apache.flink</groupid>
  <artifactid>flink-clients_2.10</artifactid>
  <version>1.1-SNAPSHOT</version>
</dependency>

本地执行

  LocalEnvironment是在本地模式运行Flink程序的句柄,可以用它在本地的 JVM (standalone 或嵌入其他程序)里运行程序。本地执行环境是通过调用ExecutionEnvironment.createLocalEnvironment()方法实现的。默认情况下,Flink会根据你机器的CPU核数开启同样多的线程来执行程序。当然,我们可以指定程序的并行度。本模式的日志可以通过enableLogging()/disableLogging()来设置是否向标准输出输出。

  在大多数情况下,推荐使用ExecutionEnvironment.getExecutionEnvironment()来获得Flink的运行环境。 当程序是在本地(未使用命令行接口)启动时,该方法会返回LocalEnvironment,当程序是通过命令行接口命令行接口(CLI)提交时,则该方法会返回集群的执行环境。

public static void main(String[] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
    DataSet<String> data = env.readTextFile("file:///path/to/file");
    data
        .filter(new FilterFunction<String>() {
            public boolean filter(String value) {
                return value.startsWith("http://");
            }
        })
        .writeAsText("file:///path/to/result");
    JobExecutionResult res = env.execute();
}

JobExecutionResult对象在程序执行结束时会返回,这个类中包含了程序的运行环境和累加器的结果。

LocalEnvironment 类也可以向Flink传入一个用户自定义的配置选项。

Configuration conf = new Configuration();
conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.5f);
final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
注意:本地执行环境不会启动任何web前端来监控执行过程。

集合环境

  在Java集合上执行操作将会使用CollectionEnvironment类,这个类执行Flink程序时使用了一些低开销的方法。 这种模式通常用于自动化测试、调试、代码重用等场景。

  用户可以实现用于批处理的算法,或者是实现用于更具交互行的场下的算法。一个Flink程序通过稍微修改即可用于处理请求的Java应用服务器。下面是集合环境的例子:

public static void main(String[] args) throws Exception {
    // initialize a new Collection-based execution environment
    final ExecutionEnvironment env = new CollectionEnvironment();
    DataSet<User> users = env.fromCollection( /* get elements from a Java Collection */);
    /* Data Set transformations ... */
    // retrieve the resulting Tuple2 elements into a ArrayList.
    Collection<...> result = new ArrayList<...>();
    resultDataSet.output(new LocalCollectionOutputFormat<...>(result));
    // kick off execution.
    env.execute();
    // Do some work with the resulting ArrayList (=Collection).
    for(... t : result) {
        System.err.println("Result = "+t);
    }
}

在flink-examples-batch模块中有完整的示例,类名是CollectionExecutionExample .

  值得注意的是,基于集合环境的 Flink 程序只适用于小数据, 不能超过JVM的堆内存大小 集合模式中的执行器只使用了单线程,而不是多线程。

(0)

相关推荐