[groovy] branch master updated: Tweak partition cache policy further

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[groovy] branch master updated: Tweak partition cache policy further

Daniel.Sun
This is an automated email from the ASF dual-hosted git repository.

sunlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/groovy.git


The following commit(s) were added to refs/heads/master by this push:
     new 21d07fb  Tweak partition cache policy further
21d07fb is described below

commit 21d07fb796a3cf5388d5a4cdd9df80cbba9e4bda
Author: Daniel Sun <[hidden email]>
AuthorDate: Sat Jan 9 14:34:46 2021 +0800

    Tweak partition cache policy further
---
 .../collection/runtime/QueryableCollection.java    | 42 ++++++++++++++++++----
 .../test/org/apache/groovy/ginq/GinqTest.groovy    |  2 +-
 2 files changed, 36 insertions(+), 8 deletions(-)

diff --git a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java
index 95515ff..faa3081 100644
--- a/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java
+++ b/subprojects/groovy-ginq/src/main/groovy/org/apache/groovy/ginq/provider/collection/runtime/QueryableCollection.java
@@ -511,10 +511,13 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
     public <U extends Comparable<? super U>> Window<T> over(Tuple2<T, Long> currentRecord, WindowDefinition<T, U> windowDefinition) {
         this.makeReusable();
         final Tuple3<String, String, String> idTuple = (Tuple3<String, String, String>) windowDefinition.getId(); // (partitionId, orderId, windowDefinitionId)
-        Partition<Tuple2<T, Long>> partition =
-                from(Collections.singletonList(currentRecord)).innerHashJoin(
-                        partitionCache.computeIfAbsent(idTuple.getV1(), partitionId -> {
-                            long[] rn = new long[] { 1L };
+        final String partitionId = idTuple.getV1();
+
+        Partition<Tuple2<T, Long>> partition = partitionCache.computeIfAbsent(
+                new PartitionCacheKey(windowDefinition.partitionBy().apply(currentRecord.getV1()), partitionId),
+                partitionCacheKey -> from(Collections.singletonList(currentRecord)).innerHashJoin(
+                        allPartitionCache.computeIfAbsent(partitionId, pid -> {
+                            long[] rn = new long[]{1L};
                             List<Tuple2<T, Long>> listWithIndex =
                                     this.toList().stream()
                                             .map(e -> Tuple.tuple(e, rn[0]++))
@@ -528,11 +531,12 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
                                 ((QueryableCollection) q).makeReusable();
                             }
                             return q;
-                        }), a -> windowDefinition.partitionBy().apply(a.getV1()), Tuple2::getV1
+                        }), a -> partitionCacheKey.partitionKey, Tuple2::getV1
                 ).select((e, q) -> e.getV2().getV2())
                         .stream()
                         .findFirst()
-                        .orElse(Partition.emptyPartition());
+                        .orElse(Partition.emptyPartition())
+        );
 
         final String orderId = idTuple.getV2();
         final SortedPartitionCacheKey<T> sortedPartitionCacheKey = new SortedPartitionCacheKey<>(partition, orderId);
@@ -544,6 +548,29 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
         return WindowImpl.newInstance(currentRecord, sortedPartition, windowDefinition);
     }
 
+    private static class PartitionCacheKey {
+        private final Object partitionKey;
+        private final String partitionId;
+
+        public PartitionCacheKey(Object partitionKey, String partitionId) {
+            this.partitionKey = partitionKey;
+            this.partitionId = partitionId;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof PartitionCacheKey)) return false;
+            PartitionCacheKey that = (PartitionCacheKey) o;
+            return partitionKey.equals(that.partitionKey) && partitionId.equals(that.partitionId);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(partitionKey, partitionId);
+        }
+    }
+
     private static class SortedPartitionCacheKey<T> {
         private final Partition<Tuple2<T, Long>> partition;
         private final String orderId;
@@ -639,7 +666,8 @@ class QueryableCollection<T> implements Queryable<T>, Serializable {
         return AsciiTableMaker.makeAsciiTable(this);
     }
 
-    private final Map<String, Queryable<Tuple2<?, Partition<Tuple2<T, Long>>>>> partitionCache = new ConcurrentHashMap<>(4);
+    private final Map<String, Queryable<Tuple2<?, Partition<Tuple2<T, Long>>>>> allPartitionCache = new ConcurrentHashMap<>(4);
+    private final Map<PartitionCacheKey, Partition<Tuple2<T, Long>>> partitionCache = new ConcurrentHashMap<>(4);
     private final Map<SortedPartitionCacheKey<T>, Partition<Tuple2<T, Long>>> sortedPartitionCache = new ConcurrentHashMap<>(4);
     private Stream<T> sourceStream;
     private volatile Iterable<T> sourceIterable;
diff --git a/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy b/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy
index 1e021c3..f0fc89c 100644
--- a/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy
+++ b/subprojects/groovy-ginq/src/spec/test/org/apache/groovy/ginq/GinqTest.groovy
@@ -4870,7 +4870,7 @@ class GinqTest {
 
     @Test
     void "testGinq - window - 15"() {
-        assertScript '''
+        assertGinqScript '''
 // tag::ginq_winfunction_05[]
             assert [['a', null], ['b', 'a'], ['aa', null], ['bb', 'aa']] == GQ {
                 from s in ['a', 'b', 'aa', 'bb']

Apache Groovy committer & PMC member

Blog: http://blog.sunlan.me
Twitter: @daniel_sun