From 4974b51c53aa6e24dd8bb2db1fa69db8cc171721 Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Fri, 10 Apr 2020 10:52:13 -0700 Subject: [PATCH] rls: LruCache interface and implementation (#6799) --- .../grpc/rls/internal/LinkedHashLruCache.java | 386 ++++++++++++++++++ .../java/io/grpc/rls/internal/LruCache.java | 95 +++++ .../rls/internal/LinkedHashLruCacheTest.java | 255 ++++++++++++ 3 files changed, 736 insertions(+) create mode 100644 rls/src/main/java/io/grpc/rls/internal/LinkedHashLruCache.java create mode 100644 rls/src/main/java/io/grpc/rls/internal/LruCache.java create mode 100644 rls/src/test/java/io/grpc/rls/internal/LinkedHashLruCacheTest.java diff --git a/rls/src/main/java/io/grpc/rls/internal/LinkedHashLruCache.java b/rls/src/main/java/io/grpc/rls/internal/LinkedHashLruCache.java new file mode 100644 index 0000000000..b2a122e050 --- /dev/null +++ b/rls/src/main/java/io/grpc/rls/internal/LinkedHashLruCache.java @@ -0,0 +1,386 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.rls.internal; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.base.MoreObjects; +import io.grpc.internal.TimeProvider; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.CheckReturnValue; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +/** + * A LinkedHashLruCache implements least recently used caching where it supports access order lru + * cache eviction while allowing entry level expiration time. When the cache reaches max capacity, + * LruCache try to remove up to one already expired entries. If it doesn't find any expired entries, + * it will remove based on access order of entry. On top of this, LruCache also proactively removes + * expired entries based on configured time interval. + */ +@ThreadSafe +abstract class LinkedHashLruCache implements LruCache { + + private final Object lock = new Object(); + + @GuardedBy("lock") + private final LinkedHashMap delegate; + private final PeriodicCleaner periodicCleaner; + private final TimeProvider timeProvider; + private final EvictionListener evictionListener; + private final AtomicLong estimatedSizeBytes = new AtomicLong(); + private long estimatedMaxSizeBytes; + + LinkedHashLruCache( + final long estimatedMaxSizeBytes, + @Nullable final EvictionListener evictionListener, + int cleaningInterval, + TimeUnit cleaningIntervalUnit, + ScheduledExecutorService ses, + final TimeProvider timeProvider) { + checkState(estimatedMaxSizeBytes > 0, "max estimated cache size should be positive"); + this.estimatedMaxSizeBytes = estimatedMaxSizeBytes; + this.evictionListener = new SizeHandlingEvictionListener(evictionListener); + this.timeProvider = checkNotNull(timeProvider, "timeProvider"); + delegate = new LinkedHashMap( + // rough estimate or minimum hashmap default + Math.max((int) (estimatedMaxSizeBytes / 1000), 16), + /* loadFactor= */ 0.75f, + /* accessOrder= */ true) { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + if (estimatedSizeBytes.get() <= LinkedHashLruCache.this.estimatedMaxSizeBytes) { + return false; + } + + // first, remove at most 1 expired entry + boolean removed = cleanupExpiredEntries(1, timeProvider.currentTimeNanos()); + // handles size based eviction if necessary no expired entry + boolean shouldRemove = + !removed && shouldInvalidateEldestEntry(eldest.getKey(), eldest.getValue().value); + if (shouldRemove) { + // remove entry by us to make sure lruIterator and cache is in sync + LinkedHashLruCache.this.invalidate(eldest.getKey(), EvictionType.SIZE); + } + return false; + } + }; + periodicCleaner = new PeriodicCleaner(ses, cleaningInterval, cleaningIntervalUnit).start(); + } + + /** + * Determines if the eldest entry should be kept or not when the cache size limit is reached. Note + * that LruCache is access level and the eldest is determined by access pattern. + */ + @SuppressWarnings("unused") + protected boolean shouldInvalidateEldestEntry(K eldestKey, V eldestValue) { + return true; + } + + /** Determines if the entry is already expired or not. */ + protected abstract boolean isExpired(K key, V value, long nowNanos); + + /** + * Returns estimated size of entry to keep track. If it always returns 1, the max size bytes + * behaves like max number of entry (default behavior). + */ + @SuppressWarnings("unused") + protected int estimateSizeOf(K key, V value) { + return 1; + } + + /** Updates size for given key if entry exists. It is useful if the cache value is mutated. */ + public void updateEntrySize(K key) { + synchronized (lock) { + SizedValue entry = readInternal(key); + if (entry == null) { + return; + } + int prevSize = entry.size; + int newSize = estimateSizeOf(key, entry.value); + entry.size = newSize; + estimatedSizeBytes.addAndGet(newSize - prevSize); + } + } + + @Override + @Nullable + public final V cache(K key, V value) { + checkNotNull(key, "key"); + checkNotNull(value, "value"); + SizedValue existing; + int size = estimateSizeOf(key, value); + synchronized (lock) { + estimatedSizeBytes.addAndGet(size); + existing = delegate.put(key, new SizedValue(size, value)); + if (existing != null) { + evictionListener.onEviction(key, existing, EvictionType.REPLACED); + } + } + return existing == null ? null : existing.value; + } + + @Override + @Nullable + @CheckReturnValue + public final V read(K key) { + SizedValue entry = readInternal(key); + if (entry != null) { + return entry.value; + } + return null; + } + + @Nullable + @CheckReturnValue + private SizedValue readInternal(K key) { + checkNotNull(key, "key"); + synchronized (lock) { + SizedValue existing = delegate.get(key); + if (existing != null && isExpired(key, existing.value, timeProvider.currentTimeNanos())) { + invalidate(key, EvictionType.EXPIRED); + return null; + } + return existing; + } + } + + @Override + @Nullable + public final V invalidate(K key) { + return invalidate(key, EvictionType.EXPLICIT); + } + + @Nullable + private V invalidate(K key, EvictionType cause) { + checkNotNull(key, "key"); + checkNotNull(cause, "cause"); + synchronized (lock) { + SizedValue existing = delegate.remove(key); + if (existing != null) { + evictionListener.onEviction(key, existing, cause); + } + return existing == null ? null : existing.value; + } + } + + @Override + public final void invalidateAll(Iterable keys) { + checkNotNull(keys, "keys"); + synchronized (lock) { + for (K key : keys) { + SizedValue existing = delegate.remove(key); + if (existing != null) { + evictionListener.onEviction(key, existing, EvictionType.EXPLICIT); + } + } + } + } + + @Override + @CheckReturnValue + public final boolean hasCacheEntry(K key) { + // call readInternal to filter already expired entry in the cache + return readInternal(key) != null; + } + + /** Returns shallow copied values in the cache. */ + public final List values() { + synchronized (lock) { + List list = new ArrayList<>(delegate.size()); + for (SizedValue value : delegate.values()) { + list.add(value.value); + } + return Collections.unmodifiableList(list); + } + } + + /** + * Resizes cache. If new size is smaller than current estimated size, it will free up space by + * removing expired entries and removing oldest entries by LRU order. + */ + public final void resize(int newSizeBytes) { + long now = timeProvider.currentTimeNanos(); + synchronized (lock) { + long estimatedSizeBytesCopy = estimatedMaxSizeBytes; + this.estimatedMaxSizeBytes = newSizeBytes; + if (estimatedSizeBytesCopy <= newSizeBytes) { + // new size is larger no need to do cleanup + return; + } + // cleanup expired entries + cleanupExpiredEntries(now); + + // cleanup eldest entry until new size limit + Iterator> lruIter = delegate.entrySet().iterator(); + while (lruIter.hasNext() && estimatedMaxSizeBytes > this.estimatedSizeBytes.get()) { + Map.Entry entry = lruIter.next(); + lruIter.remove(); + // eviction listener will update the estimatedSizeBytes + evictionListener.onEviction(entry.getKey(), entry.getValue(), EvictionType.SIZE); + } + } + } + + @Override + @CheckReturnValue + public final int estimatedSize() { + synchronized (lock) { + return delegate.size(); + } + } + + private boolean cleanupExpiredEntries(long now) { + return cleanupExpiredEntries(Integer.MAX_VALUE, now); + } + + // maxExpiredEntries is by number of entries + private boolean cleanupExpiredEntries(int maxExpiredEntries, long now) { + checkArgument(maxExpiredEntries > 0, "maxExpiredEntries must be positive"); + boolean removedAny = false; + synchronized (lock) { + Iterator> lruIter = delegate.entrySet().iterator(); + while (lruIter.hasNext() && maxExpiredEntries > 0) { + Map.Entry entry = lruIter.next(); + if (isExpired(entry.getKey(), entry.getValue().value, now)) { + lruIter.remove(); + evictionListener.onEviction(entry.getKey(), entry.getValue(), EvictionType.EXPIRED); + removedAny = true; + maxExpiredEntries--; + } + } + } + return removedAny; + } + + @Override + public final void close() { + synchronized (lock) { + periodicCleaner.stop(); + doClose(); + delegate.clear(); + } + } + + protected void doClose() {} + + /** Periodically cleans up the AsyncRequestCache. */ + private final class PeriodicCleaner { + + private final ScheduledExecutorService ses; + private final int interval; + private final TimeUnit intervalUnit; + private ScheduledFuture scheduledFuture; + + PeriodicCleaner(ScheduledExecutorService ses, int interval, TimeUnit intervalUnit) { + this.ses = checkNotNull(ses, "ses"); + checkState(interval > 0, "interval must be positive"); + this.interval = interval; + this.intervalUnit = checkNotNull(intervalUnit, "intervalUnit"); + } + + PeriodicCleaner start() { + checkState(scheduledFuture == null, "cleaning task can be started only once"); + this.scheduledFuture = + ses.scheduleAtFixedRate(new CleaningTask(), interval, interval, intervalUnit); + return this; + } + + void stop() { + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + scheduledFuture = null; + } + } + + private class CleaningTask implements Runnable { + + @Override + public void run() { + cleanupExpiredEntries(timeProvider.currentTimeNanos()); + } + } + } + + /** A {@link EvictionListener} keeps track of size. */ + private final class SizeHandlingEvictionListener implements EvictionListener { + + private final EvictionListener delegate; + + SizeHandlingEvictionListener(@Nullable EvictionListener delegate) { + this.delegate = delegate; + } + + @Override + public void onEviction(K key, SizedValue value, EvictionType cause) { + estimatedSizeBytes.addAndGet(-1 * value.size); + if (delegate != null) { + delegate.onEviction(key, value.value, cause); + } + } + } + + private final class SizedValue { + volatile int size; + final V value; + + SizedValue(int size, V value) { + this.size = size; + this.value = value; + } + + @Override + public boolean equals(Object o) { + // NOTE: the size doesn't affect equality + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + LinkedHashLruCache.SizedValue that = (LinkedHashLruCache.SizedValue) o; + return Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + // NOTE: the size doesn't affect hashCode + return Objects.hash(value); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("size", size) + .add("value", value) + .toString(); + } + } +} diff --git a/rls/src/main/java/io/grpc/rls/internal/LruCache.java b/rls/src/main/java/io/grpc/rls/internal/LruCache.java new file mode 100644 index 0000000000..07909919f1 --- /dev/null +++ b/rls/src/main/java/io/grpc/rls/internal/LruCache.java @@ -0,0 +1,95 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.rls.internal; + +import javax.annotation.CheckReturnValue; +import javax.annotation.Nullable; + +/** An LruCache is a cache with least recently used eviction. */ +interface LruCache { + + /** + * Populates a cache entry. If the cache entry for given key already exists, the value will be + * replaced to the new value. + * + * @return the previous value associated with key, otherwise {@code null} + */ + @Nullable + V cache(K key, V value); + + /** + * Returns cached value for given key if exists, otherwise {@code null}. This operation doesn't + * return already expired cache entry. + */ + @Nullable + @CheckReturnValue + V read(K key); + + /** + * Invalidates an entry for given key if exists. This operation will trigger {@link + * EvictionListener} with {@link EvictionType#EXPLICIT}. + * + * @return the previous value associated with key, otherwise {@code null} + */ + @Nullable + V invalidate(K key); + + /** + * Invalidates cache entries for given keys. This operation will trigger {@link EvictionListener} + * with {@link EvictionType#EXPLICIT}. + */ + void invalidateAll(Iterable keys); + + /** Returns {@code true} if given key is cached. */ + @CheckReturnValue + boolean hasCacheEntry(K key); + + /** + * Returns the estimated number of entry of the cache. Note that the size can be larger than its + * true size, because there might be already expired cache. + */ + @CheckReturnValue + int estimatedSize(); + + /** Closes underlying resources. */ + void close(); + + /** A Listener notifies cache eviction events. */ + interface EvictionListener { + + /** + * Notifies the listener when any cache entry is evicted. Implementation can assume that this + * method is called serially. Implementation should be non blocking, for long running task + * consider offloading the task to {@link java.util.concurrent.Executor}. + */ + void onEviction(K key, V value, EvictionType cause); + } + + /** Type of cache eviction. */ + enum EvictionType { + /** Explicitly removed by user. */ + EXPLICIT, + /** Evicted due to size limit. */ + SIZE, + /** Evicted due to entry expired. */ + EXPIRED, + /** Removed due to error. */ + ERROR, + /** Evicted by replacement. */ + REPLACED + } +} diff --git a/rls/src/test/java/io/grpc/rls/internal/LinkedHashLruCacheTest.java b/rls/src/test/java/io/grpc/rls/internal/LinkedHashLruCacheTest.java new file mode 100644 index 0000000000..e8252a5109 --- /dev/null +++ b/rls/src/test/java/io/grpc/rls/internal/LinkedHashLruCacheTest.java @@ -0,0 +1,255 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.rls.internal; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import io.grpc.internal.TimeProvider; +import io.grpc.rls.internal.LruCache.EvictionListener; +import io.grpc.rls.internal.LruCache.EvictionType; +import java.util.Objects; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +@RunWith(JUnit4.class) +public class LinkedHashLruCacheTest { + + private static final int MAX_SIZE = 5; + + @Rule + public final MockitoRule mocks = MockitoJUnit.rule(); + + private final DoNotUseFakeScheduledService fakeScheduledService = + mock(DoNotUseFakeScheduledService.class, CALLS_REAL_METHODS); + private final TimeProvider timeProvider = fakeScheduledService.getFakeTicker(); + + @Mock + private EvictionListener evictionListener; + private LinkedHashLruCache cache; + + @Before + public void setUp() { + this.cache = new LinkedHashLruCache( + MAX_SIZE, + evictionListener, + 10, + TimeUnit.NANOSECONDS, + fakeScheduledService, + timeProvider) { + @Override + protected boolean isExpired(Integer key, Entry value, long nowNanos) { + return value.expireTime <= nowNanos; + } + }; + } + + @Test + public void eviction_size() { + for (int i = 1; i <= MAX_SIZE; i++) { + cache.cache(i, new Entry("Entry" + i, Long.MAX_VALUE)); + } + cache.cache(MAX_SIZE + 1, new Entry("should kick the first", Long.MAX_VALUE)); + + verify(evictionListener).onEviction(1, new Entry("Entry1", Long.MAX_VALUE), EvictionType.SIZE); + assertThat(cache.estimatedSize()).isEqualTo(MAX_SIZE); + } + + @Test + public void size() { + Entry entry1 = new Entry("Entry0", timeProvider.currentTimeNanos() + 10); + Entry entry2 = new Entry("Entry1", timeProvider.currentTimeNanos() + 20); + cache.cache(0, entry1); + cache.cache(1, entry2); + assertThat(cache.estimatedSize()).isEqualTo(2); + + assertThat(cache.invalidate(0)).isEqualTo(entry1); + assertThat(cache.estimatedSize()).isEqualTo(1); + + assertThat(cache.invalidate(1)).isEqualTo(entry2); + assertThat(cache.estimatedSize()).isEqualTo(0); + } + + @Test + public void eviction_expire() { + Entry toBeEvicted = new Entry("Entry0", timeProvider.currentTimeNanos() + 10); + Entry survivor = new Entry("Entry1", timeProvider.currentTimeNanos() + 20); + cache.cache(0, toBeEvicted); + cache.cache(1, survivor); + + fakeScheduledService.advance(10, TimeUnit.NANOSECONDS); + verify(evictionListener).onEviction(0, toBeEvicted, EvictionType.EXPIRED); + + fakeScheduledService.advance(10, TimeUnit.NANOSECONDS); + verify(evictionListener).onEviction(1, survivor, EvictionType.EXPIRED); + } + + @Test + public void eviction_explicit() { + Entry toBeEvicted = new Entry("Entry0", timeProvider.currentTimeNanos() + 10); + Entry survivor = new Entry("Entry1", timeProvider.currentTimeNanos() + 20); + cache.cache(0, toBeEvicted); + cache.cache(1, survivor); + + assertThat(cache.invalidate(0)).isEqualTo(toBeEvicted); + + verify(evictionListener).onEviction(0, toBeEvicted, EvictionType.EXPLICIT); + } + + @Test + public void eviction_replaced() { + Entry toBeEvicted = new Entry("Entry0", timeProvider.currentTimeNanos() + 10); + Entry survivor = new Entry("Entry1", timeProvider.currentTimeNanos() + 20); + cache.cache(0, toBeEvicted); + cache.cache(0, survivor); + + verify(evictionListener).onEviction(0, toBeEvicted, EvictionType.REPLACED); + } + + @Test + public void eviction_size_shouldEvictAlreadyExpired() { + for (int i = 1; i <= MAX_SIZE; i++) { + // last two entries are <= current time (already expired) + cache.cache(i, new Entry("Entry" + i, timeProvider.currentTimeNanos() + MAX_SIZE - i - 1)); + } + cache.cache(MAX_SIZE + 1, new Entry("should kick the first", Long.MAX_VALUE)); + + // should remove MAX_SIZE-1 instead of MAX_SIZE because MAX_SIZE is accessed later + verify(evictionListener) + .onEviction(eq(MAX_SIZE - 1), any(Entry.class), eq(EvictionType.EXPIRED)); + assertThat(cache.estimatedSize()).isEqualTo(MAX_SIZE); + } + + @Test + public void eviction_get_shouldNotReturnAlreadyExpired() { + for (int i = 1; i <= MAX_SIZE; i++) { + // last entry is already expired when added + cache.cache(i, new Entry("Entry" + i, timeProvider.currentTimeNanos() + MAX_SIZE - i)); + } + + assertThat(cache.estimatedSize()).isEqualTo(MAX_SIZE); + assertThat(cache.read(MAX_SIZE)).isNull(); + assertThat(cache.estimatedSize()).isEqualTo(MAX_SIZE - 1); + verify(evictionListener).onEviction(eq(MAX_SIZE), any(Entry.class), eq(EvictionType.EXPIRED)); + } + + private static final class Entry { + String value; + long expireTime; + + Entry(String value, long expireTime) { + this.value = value; + this.expireTime = expireTime; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Entry entry = (Entry) o; + return expireTime == entry.expireTime && Objects.equals(value, entry.value); + } + + @Override + public int hashCode() { + return Objects.hash(value, expireTime); + } + } + + /** + * A fake minimal implementation of ScheduledExecutorService *only* supports scheduledAtFixedRate + * with a lot of limitation / assumptions. Only intended to be used in this test with + * CALL_REAL_METHODS mock. + */ + private abstract static class DoNotUseFakeScheduledService implements ScheduledExecutorService { + + private long currTimeNanos; + private long period; + private long nextRun; + private AtomicReference command; + + @Override + public final ScheduledFuture scheduleAtFixedRate( + Runnable command, long initialDelay, long period, TimeUnit unit) { + // hack to initialize + if (this.command == null) { + this.command = new AtomicReference<>(); + } + checkState(this.command.get() == null, "only can schedule one"); + checkState(period > 0, "period should be positive"); + checkState(initialDelay >= 0, "initial delay should be >= 0"); + if (initialDelay == 0) { + initialDelay = period; + command.run(); + } + this.command.set(checkNotNull(command, "command")); + this.nextRun = checkNotNull(unit, "unit").toNanos(initialDelay) + currTimeNanos; + this.period = unit.toNanos(period); + return mock(ScheduledFuture.class); + } + + TimeProvider getFakeTicker() { + return new TimeProvider() { + @Override + public long currentTimeNanos() { + return currTimeNanos; + } + }; + } + + void advance(long delta, TimeUnit unit) { + // if scheduled command, only can advance the ticker to trigger at most 1 event + boolean scheduled = command != null && command.get() != null; + long deltaNanos = unit.toNanos(delta); + if (scheduled) { + checkArgument( + (this.currTimeNanos + deltaNanos) < (nextRun + 2 * period), + "Cannot advance ticker because more than one repeated tasks will run"); + long finalTime = this.currTimeNanos + deltaNanos; + if (finalTime >= nextRun) { + nextRun += period; + this.currTimeNanos = nextRun; + command.get().run(); + } + this.currTimeNanos = finalTime; + } else { + this.currTimeNanos += deltaNanos; + } + } + } +}