Skip to content

Commit a05a535

Browse files
authored
Rework com.spotify.scio.transforms package (spotify#3454)
1 parent 7538348 commit a05a535

File tree

12 files changed

+582
-357
lines changed

12 files changed

+582
-357
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2020 Spotify AB
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.spotify.scio.transforms
18+
19+
import scala.util.{Failure, Success, Try}
20+
21+
object JavaAsyncConverters {
22+
23+
/** Enhanced version of `AsyncLookupDoFn.Try` with convenience methods. */
24+
implicit class RichAsyncLookupDoFnTry[A](private val self: BaseAsyncLookupDoFn.Try[A])
25+
extends AnyVal {
26+
27+
/** Convert this `AsyncLookupDoFn.Try` to a Scala `Try`. */
28+
def asScala: Try[A] =
29+
if (self.isSuccess) Success(self.get()) else Failure(self.getException)
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2020 Spotify AB
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.spotify.scio.transforms
18+
19+
import com.spotify.scio.util.ParallelLimitedFn
20+
import com.twitter.chill.ClosureCleaner
21+
import org.apache.beam.sdk.transforms.DoFn
22+
23+
class ParallelCollectFn[T, U](parallelism: Int)(pfn: PartialFunction[T, U])
24+
extends ParallelLimitedFn[T, U](parallelism) {
25+
val isDefined: T => Boolean = ClosureCleaner.clean(pfn.isDefinedAt(_)) // defeat closure
26+
val g: PartialFunction[T, U] = ClosureCleaner.clean(pfn) // defeat closure
27+
def parallelProcessElement(c: DoFn[T, U]#ProcessContext): Unit =
28+
if (isDefined(c.element())) {
29+
c.output(g(c.element()))
30+
}
31+
}
32+
33+
class ParallelFilterFn[T](parallelism: Int)(f: T => Boolean)
34+
extends ParallelLimitedFn[T, T](parallelism) {
35+
val g: T => Boolean = ClosureCleaner.clean(f) // defeat closure
36+
def parallelProcessElement(c: DoFn[T, T]#ProcessContext): Unit =
37+
if (g(c.element())) {
38+
c.output(c.element())
39+
}
40+
}
41+
42+
class ParallelMapFn[T, U](parallelism: Int)(f: T => U)
43+
extends ParallelLimitedFn[T, U](parallelism) {
44+
val g: T => U = ClosureCleaner.clean(f) // defeat closure
45+
def parallelProcessElement(c: DoFn[T, U]#ProcessContext): Unit =
46+
c.output(g(c.element()))
47+
}
48+
49+
class ParallelFlatMapFn[T, U](parallelism: Int)(f: T => TraversableOnce[U])
50+
extends ParallelLimitedFn[T, U](parallelism: Int) {
51+
val g: T => TraversableOnce[U] = ClosureCleaner.clean(f) // defeat closure
52+
def parallelProcessElement(c: DoFn[T, U]#ProcessContext): Unit = {
53+
val i = g(c.element()).toIterator
54+
while (i.hasNext) c.output(i.next())
55+
}
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2020 Spotify AB
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.spotify.scio.transforms
18+
19+
import com.spotify.scio.transforms.DoFnWithResource.ResourceType
20+
21+
import com.twitter.chill.ClosureCleaner
22+
import org.apache.beam.sdk.transforms.DoFn.ProcessElement
23+
import org.apache.beam.sdk.transforms.DoFn
24+
25+
class CollectFnWithResource[T, U, R] private[transforms] (
26+
resource: => R,
27+
resourceType: ResourceType,
28+
pfn: PartialFunction[(R, T), U]
29+
) extends DoFnWithResource[T, U, R] {
30+
override def getResourceType: ResourceType = resourceType
31+
32+
override def createResource: R = resource
33+
34+
val isDefined: ((R, T)) => Boolean = ClosureCleaner.clean(pfn.isDefinedAt) // defeat closure
35+
val g: PartialFunction[(R, T), U] = ClosureCleaner.clean(pfn)
36+
@ProcessElement
37+
def processElement(c: DoFn[T, U]#ProcessContext): Unit =
38+
if (isDefined((getResource, c.element()))) {
39+
c.output(g((getResource, c.element())))
40+
}
41+
}
42+
43+
class MapFnWithResource[T, U, R] private[transforms] (
44+
resource: => R,
45+
resourceType: ResourceType,
46+
f: (R, T) => U
47+
) extends DoFnWithResource[T, U, R] {
48+
override def getResourceType: ResourceType = resourceType
49+
50+
override def createResource: R = resource
51+
52+
val g: (R, T) => U = ClosureCleaner.clean(f)
53+
54+
@ProcessElement
55+
def processElement(c: DoFn[T, U]#ProcessContext): Unit =
56+
c.output(g(getResource, c.element()))
57+
}
58+
59+
class FlatMapFnWithResource[T, U, R] private[transforms] (
60+
resource: => R,
61+
resourceType: ResourceType,
62+
f: (R, T) => TraversableOnce[U]
63+
) extends DoFnWithResource[T, U, R] {
64+
override def getResourceType: ResourceType = resourceType
65+
66+
override def createResource: R = resource
67+
68+
val g: (R, T) => TraversableOnce[U] = ClosureCleaner.clean(f)
69+
@ProcessElement
70+
def processElement(c: DoFn[T, U]#ProcessContext): Unit = {
71+
val i = g(getResource, c.element()).toIterator
72+
while (i.hasNext) c.output(i.next())
73+
}
74+
}
75+
76+
class FilterFnWithResource[T, R] private[transforms] (
77+
resource: => R,
78+
resourceType: ResourceType,
79+
f: (R, T) => Boolean
80+
) extends DoFnWithResource[T, T, R] {
81+
override def getResourceType: ResourceType = resourceType
82+
83+
override def createResource: R = resource
84+
85+
val g: (R, T) => Boolean = ClosureCleaner.clean(f)
86+
@ProcessElement
87+
def processElement(c: DoFn[T, T]#ProcessContext): Unit =
88+
if (g(getResource, c.element())) {
89+
c.output(c.element())
90+
}
91+
}

0 commit comments

Comments
 (0)