This is an automated email from the ASF dual-hosted git repository.
sunlan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/groovy.git The following commit(s) were added to refs/heads/master by this push: new 21d07fb Tweak partition cache policy further 21d07fb is described below commit 21d07fb796a3cf5388d5a4cdd9df80cbba9e4bda Author: Daniel Sun <[hidden email]> AuthorDate: Sat Jan 9 14:34:46 2021 +0800 Tweak partition cache policy further --- .../collection/runtime/QueryableCollection.java | 42 ++++++++++++++++++---- .../test/org/apache/groovy/ginq/GinqTest.groovy | 2 +- 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java index 95515ff..faa3081 100644 --- a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java +++ b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java @@ -511,10 +511,13 @@ class QueryableCollection<T> implements Queryable<T>, Serializable { public <U extends Comparable<? super U>> Window<T> over(Tuple2<T, Long> currentRecord, WindowDefinition<T, U> windowDefinition) { this.makeReusable(); final Tuple3<String, String, String> idTuple = (Tuple3<String, String, String>) windowDefinition.getId(); // (partitionId, orderId, windowDefinitionId) - Partition<Tuple2<T, Long>> partition = - from(Collections.singletonList(currentRecord)).innerHashJoin( - partitionCache.computeIfAbsent(idTuple.getV1(), partitionId -> { - long[] rn = new long[] { 1L }; + final String partitionId = idTuple.getV1(); + + Partition<Tuple2<T, Long>> partition = partitionCache.computeIfAbsent( + new PartitionCacheKey(windowDefinition.partitionBy().apply(currentRecord.getV1()), partitionId), + partitionCacheKey -> from(Collections.singletonList(currentRecord)).innerHashJoin( + allPartitionCache.computeIfAbsent(partitionId, pid -> { + long[] rn = new long[]{1L}; List<Tuple2<T, Long>> listWithIndex = this.toList().stream() .map(e -> Tuple.tuple(e, rn[0]++)) @@ -528,11 +531,12 @@ class QueryableCollection<T> implements Queryable<T>, Serializable { ((QueryableCollection) q).makeReusable(); } return q; - }), a -> windowDefinition.partitionBy().apply(a.getV1()), Tuple2::getV1 + }), a -> partitionCacheKey.partitionKey, Tuple2::getV1 ).select((e, q) -> e.getV2().getV2()) .stream() .findFirst() - .orElse(Partition.emptyPartition()); + .orElse(Partition.emptyPartition()) + ); final String orderId = idTuple.getV2(); final SortedPartitionCacheKey<T> sortedPartitionCacheKey = new SortedPartitionCacheKey<>(partition, orderId); @@ -544,6 +548,29 @@ class QueryableCollection<T> implements Queryable<T>, Serializable { return WindowImpl.newInstance(currentRecord, sortedPartition, windowDefinition); } + private static class PartitionCacheKey { + private final Object partitionKey; + private final String partitionId; + + public PartitionCacheKey(Object partitionKey, String partitionId) { + this.partitionKey = partitionKey; + this.partitionId = partitionId; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof PartitionCacheKey)) return false; + PartitionCacheKey that = (PartitionCacheKey) o; + return partitionKey.equals(that.partitionKey) && partitionId.equals(that.partitionId); + } + + @Override + public int hashCode() { + return Objects.hash(partitionKey, partitionId); + } + } + private static class SortedPartitionCacheKey<T> { private final Partition<Tuple2<T, Long>> partition; private final String orderId; @@ -639,7 +666,8 @@ class QueryableCollection<T> implements Queryable<T>, Serializable { return AsciiTableMaker.makeAsciiTable(this); } - private final Map<String, Queryable<Tuple2<?, Partition<Tuple2<T, Long>>>>> partitionCache = new ConcurrentHashMap<>(4); + private final Map<String, Queryable<Tuple2<?, Partition<Tuple2<T, Long>>>>> allPartitionCache = new ConcurrentHashMap<>(4); + private final Map<PartitionCacheKey, Partition<Tuple2<T, Long>>> partitionCache = new ConcurrentHashMap<>(4); private final Map<SortedPartitionCacheKey<T>, Partition<Tuple2<T, Long>>> sortedPartitionCache = new ConcurrentHashMap<>(4); private Stream<T> sourceStream; private volatile Iterable<T> sourceIterable; diff --git a/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy b/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy index 1e021c3..f0fc89c 100644 --- a/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy +++ b/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy @@ -4870,7 +4870,7 @@ class GinqTest { @Test void "testGinq - window - 15"() { - assertScript ''' + assertGinqScript ''' // tag::ginq_winfunction_05[] assert [['a', null], ['b', 'a'], ['aa', null], ['bb', 'aa']] == GQ { from s in ['a', 'b', 'aa', 'bb'] |
Free forum by Nabble | Edit this page |