在ApacheBeam中避开Kotlin雷区

在这篇文章中,您将了解Kotlin和Beam之间的一些独特交互作用,这将帮助您避免开始时可能出现的一些潜在的地雷陷阱,因此您可以专注于这两种技术之间的丰富经验。

毫无疑问,JavaSDK是ApacheBeam支持的语言中最受欢迎和功能最全的,如果您将Java的现代,开放源代码表兄弟Kotlin的功能发挥到极致,那么您将发现自己拥有出色的开发人员经验。与大多数伟大的人际关系一样,并非一切都完美无缺,而Beam-Kotlin并非一无是处。

这篇文章将介绍这两种技术之间的一些独特交互作用,并帮助您避免开始时可能会遇到的潜在地雷陷阱,因此您可以专注于Kotlin和Beam之间的丰富经验。

声明匿名ParDos/DoFns

在网上搜寻示例时,通常会看到类似以下内容的内容,这些内容创建了一个要在ParDo中使用的匿名DoFn:

lines.apply("Extract Words", ParDo.of(new DoFn<String, String>() { ... }));1复制代码类型:[java]

简单地转换为Kotlin将产生以下结果:

lines.apply("Extract Words", ParDo.of(DoFn<String, String>() { ... }))1复制代码类型:[java]

但是,您会发现这会导致类型擦除,而Beam会抱怨它。而是为了实现匿名功能,必须指示对象明确地从DoFn继承:

lines.apply("Extract Words", ParDo.of(object : DoFn<String, String>() { ... }))1复制代码类型:[java]

定义元组标签

如果您要处理涉及多种类型的转换或操作,则TupleTags可能是非常宝贵的,而且是必要的。但是,您可能会发现与这些声明有关的问题冒出泡沫,导致您要么显式要求使用Coder来通过以下setCoder()函数定义代码,检索特定标签。

死掉的赠品将是以下错误:

线程“main”中的异常java.lang.IllegalStateException:无法返回的默认编码器Transform.out1[PCollection]。纠正以下根本原因之一:尚未手动指定编码器;您可以使用.setCoder()。从CoderRegistry失败中推断出编码器:无法为V提供编码器。使用已注册的CoderProvider失败来构建编码器。有关详细的失败,请参见抑制的异常。使用生产环境中的默认输出编码器PTransform失败:无法为V提供编码器。使用已注册的编码器构建CoderProvider失败。

如果遇到这种情况,很可能将TupleTags定义如下:

val userTag = TupleTag<KV<String, User>>()1复制代码类型:[java]

不幸的是,与Kotlin中的大多数问题一样,类型擦除可能是一个问题。为避免此问题,在定义TupleTag并使用对象实现模式时,您需要尽可能明确:

val usersTag = object: TupleTag<KV<String, User>>() {}1复制代码类型:[java]

使用对象和尾随的开闭花括号可以使特定类型在尝试从标记中读取时不会丢失。

IntelliJ生成的替代

IntelliJ最吸引人的功能之一就是能够在实现或从另一个类/接口继承时,IDE为您生成任何丢失的替代。由于Kotlin的类型检查系统,这可能是一个挑战,因为Kotlin明确使用了?。字符表示可以为空,但是Beam会要求您确保类型完全匹配。

考虑以下功能:

class ExampleTransform: PTransform<PCollection<KV<String, Test>>, PCollectionTuple>() {
 // Omitted for brevity}123复制代码类型:[java]

您知道您需要在此处执行某种类型的操作,因此您可以利用自己的IDE并允许它生成适当的替代:

执行此操作时,您会看到将添加所有类型的可空实例,显式添加参数:

// Notice the trailing ? after the type definition the inputoverride fun expand(input: PCollection<KV<String, Test>>?): PCollectionTuple {
 TODO("Not yet implemented")
}1234复制代码类型:[java]

Beam在类型和可空性方面非常明确,因此您需要确保PCollection在这种情况下,不会使用可空运算符修饰:

override fun expand(input: PCollection<KV<String, Test>>): PCollectionTuple {
 TODO("Not yet implemented")
}123复制代码类型:[java]

可迭代,但是哪个?

Java和Kotlin都具有Iterable用于处理项目集合的接口的概念,但是,当通过分组/批处理操作(例如,GroupIntoBatchs转换)来利用它们时,在类型之间可能会发生Kotlin-JavaJVM断开连接。

pipeline
 .apply("Batch Items", GroupIntoBatches.ofSize<Key, Value>(100))
 .apply("Apply Batching Transform", ParDo.of(SomeTransform.transform()))123复制代码类型:[java]

您可能会遇到类似以下内容的错误:

ProcessContext参数必须具有DoFn<Iterable<?类型。扩展Value>,Result<?扩展Value>>。ProcessContext。

**您可以通过@JvmWildcard以下方式为可迭代的类型(而不是DoFn上的可迭代本身)添加注释来解决此问题:

class SomeTransform: DoFn<KV<Key, Iterable<Value>>, KV<Key, Value>>(){
  // Omitted for brevity }123复制代码类型:[java]

对此:

class SomeTransform: DoFn<KV<Key, Iterable<@JvmWildcard Value>>, KV<Key, Value>>(){
  // Omitted for brevity}123复制代码类型:[java]

JVM的此提示应允许它确定要使用的接口的正确版本,并由Beam编程模型对其进行序列化/反序列化。

编写管道测试

在编写Beam应用程序时(尤其是总是),测试(尤其是单元测试)非常重要,但是在与Kotlin一起工作时,您应该注意测试部门中的两个主要陷阱:

定义管道

应用PAsserts

运行管道测试

定义管道

由于针对Beam管道编写单元测试时应用的原生PAsserts依赖于原生Java代码,因此在Beam中使用它们时将需要一些注释。您可以使用以下示例作为构造方法的示例:

@get:Rule@Transitiveval testPipeline: TestPipeline = TestPipeline.create()123复制代码类型:[java]

您所有的单个单元测试都可以共享此管道,但是您应该考虑完全按照上面的方式编写它,因为@get:Rule和@Transitive注释都是必需的,显式类型声明(例如:TestPipeline)也是必需的。

应用PAsserts

该PAssert库与Beam捆绑在一起,可让您针对PCollection对象显式编写测试(例如,您可以针对它们的内容编写断言,验证其内容等)。这些通常只会按预期“工作”,但是使用该.satisfies()功能时有一个特别的警告:

PAssert.that(numbers).satisfies { elements ->
 assertTrue(elements.contains(42))
}123复制代码类型:[java]

您会发现这将不起作用,因为satisfies()函数明确希望返回JavaVoid。由于Kotlin中不存在此功能,因此您需要在函数主体的末尾显式放置一个null:

PAssert.that(numbers).satisfies { elements ->
 assertTrue(elements.contains(42)) null // Required}1234复制代码类型:[java]

运行管道测试

尝试自己实际执行或运行测试时,可能会遇到麻烦。在定义PAssert之后,您必须确保管道本身已明确运行,直到完成:

PAssert.that(numbers).containsInAnyOrder(42)
testPipeline.run().waitUntilFinish()12复制代码类型:[java]

由于PAssert构造为执行管道的动态非循环图的一部分,因此必须在运行测试之前将其声明。您还会发现,如果缺少该run()声明,则将无法调试任何ParDo级别的操作。

(0)

相关推荐