Skip to content

Commit 701b7ac

Browse files
committed
KEYCLOAK-5371 More stable cross-dc tests
1 parent 7cf5204 commit 701b7ac

6 files changed

Lines changed: 125 additions & 42 deletions

File tree

common/src/main/java/org/keycloak/common/util/Retry.java

Lines changed: 72 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717

1818
package org.keycloak.common.util;
1919

20+
import java.util.Random;
21+
2022
/**
2123
* @author <a href="mailto:sthorger@redhat.com">Stian Thorgersen</a>
2224
*/
2325
public class Retry {
2426

27+
2528
/**
26-
* Runs the given {@code runnable} at most {@code retryCount} times until it passes,
29+
* Runs the given {@code runnable} at most {@code attemptsCount} times until it passes,
2730
* leaving {@code intervalMillis} milliseconds between the invocations.
2831
* The runnable is reexecuted if it throws a {@link RuntimeException} or {@link AssertionError}.
2932
* @param runnable
@@ -32,14 +35,14 @@ public class Retry {
3235
* @return Index of the first successful invocation, starting from 0.
3336
*/
3437
public static int execute(Runnable runnable, int attemptsCount, long intervalMillis) {
35-
int executionIndex = 0;
38+
int iteration = 0;
3639
while (true) {
3740
try {
3841
runnable.run();
39-
return executionIndex;
42+
return iteration;
4043
} catch (RuntimeException | AssertionError e) {
4144
attemptsCount--;
42-
executionIndex++;
45+
iteration++;
4346
if (attemptsCount > 0) {
4447
try {
4548
if (intervalMillis > 0) {
@@ -56,8 +59,56 @@ public static int execute(Runnable runnable, int attemptsCount, long intervalMil
5659
}
5760
}
5861

62+
63+
/**
64+
* Runs the given {@code runnable} at most {@code attemptsCount} times until it passes,
65+
* leaving some increasing random delay milliseconds between the invocations. It uses Exponential backoff + jitter algorithm
66+
* to compute the delay. More details https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
67+
*
68+
* The base for delay is specified by {@code intervalBaseMillis} number.
69+
*
70+
* The runnable is reexecuted if it throws a {@link RuntimeException} or {@link AssertionError}.
71+
*
72+
* @param runnable
73+
* @param attemptsCount Total number of attempts to execute the {@code runnable}
74+
* @param intervalBaseMillis base for the exponential backoff + jitter
75+
*
76+
* @return Index of the first successful invocation, starting from 0.
77+
*/
78+
public static int executeWithBackoff(AdvancedRunnable runnable, int attemptsCount, int intervalBaseMillis) {
79+
int iteration = 0;
80+
while (true) {
81+
try {
82+
runnable.run(iteration);
83+
return iteration;
84+
} catch (RuntimeException | AssertionError e) {
85+
attemptsCount--;
86+
iteration++;
87+
if (attemptsCount > 0) {
88+
try {
89+
if (intervalBaseMillis > 0) {
90+
int delay = computeBackoffInterval(intervalBaseMillis, iteration);
91+
Thread.sleep(delay);
92+
}
93+
} catch (InterruptedException ie) {
94+
ie.addSuppressed(e);
95+
throw new RuntimeException(ie);
96+
}
97+
} else {
98+
throw e;
99+
}
100+
}
101+
}
102+
}
103+
104+
private static int computeBackoffInterval(int base, int iteration) {
105+
int iterationBase = base * (int)Math.pow(2, iteration);
106+
return new Random().nextInt(iterationBase);
107+
}
108+
109+
59110
/**
60-
* Runs the given {@code runnable} at most {@code retryCount} times until it passes,
111+
* Runs the given {@code runnable} at most {@code attemptsCount} times until it passes,
61112
* leaving {@code intervalMillis} milliseconds between the invocations.
62113
* The runnable is reexecuted if it throws a {@link RuntimeException} or {@link AssertionError}.
63114
* @param supplier
@@ -66,11 +117,13 @@ public static int execute(Runnable runnable, int attemptsCount, long intervalMil
66117
* @return Value generated by the {@code supplier}.
67118
*/
68119
public static <T> T call(Supplier<T> supplier, int attemptsCount, long intervalMillis) {
120+
int iteration = 0;
69121
while (true) {
70122
try {
71-
return supplier.get();
123+
return supplier.get(iteration);
72124
} catch (RuntimeException | AssertionError e) {
73125
attemptsCount--;
126+
iteration++;
74127
if (attemptsCount > 0) {
75128
try {
76129
if (intervalMillis > 0) {
@@ -89,7 +142,18 @@ public static <T> T call(Supplier<T> supplier, int attemptsCount, long intervalM
89142

90143

91144
/**
92-
* Needed here just because java.util.function.Supplier defined from Java 8
145+
* Runnable, which provides some additional info (iteration for now)
146+
*/
147+
public interface AdvancedRunnable {
148+
149+
void run(int iteration);
150+
151+
}
152+
153+
/**
154+
* Needed here because:
155+
* - java.util.function.Supplier defined from Java 8
156+
* - Adds some additional info (current iteration)
93157
*/
94158
public interface Supplier<T> {
95159

@@ -98,7 +162,7 @@ public interface Supplier<T> {
98162
*
99163
* @return a result
100164
*/
101-
T get();
165+
T get(int iteration);
102166
}
103167

104168

model/infinispan/src/main/java/org/keycloak/cluster/infinispan/InfinispanNotificationsManager.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -222,11 +222,8 @@ private void hotrodEventReceived(String key) {
222222

223223
});
224224
} catch (RejectedExecutionException ree) {
225-
logger.warnf("Rejected submitting of the event for key: %s. Probably server going to shutdown", key);
226-
227-
if (logger.isDebugEnabled()) {
228-
logger.debug(ree.getMessage(), ree);
229-
}
225+
logger.errorf("Rejected submitting of the event for key: %s. Value: %s, Server going to shutdown or pool exhausted. Pool: %s", key, workCache.get(key), listenersExecutor.toString());
226+
throw ree;
230227
}
231228
}
232229

model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/InfinispanCodeToTokenStoreProvider.java

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.concurrent.TimeUnit;
2222
import java.util.function.Supplier;
2323

24+
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
2425
import org.infinispan.commons.api.BasicCache;
2526
import org.jboss.logging.Logger;
2627
import org.keycloak.common.util.Retry;
@@ -49,24 +50,20 @@ public boolean putIfAbsent(UUID codeId) {
4950

5051
int lifespanInSeconds = session.getContext().getRealm().getAccessCodeLifespan();
5152

52-
boolean codeAlreadyExists = Retry.call(() -> {
53-
54-
try {
55-
BasicCache<UUID, ActionTokenValueEntity> cache = codeCache.get();
56-
ActionTokenValueEntity existing = cache.putIfAbsent(codeId, tokenValue, lifespanInSeconds, TimeUnit.SECONDS);
57-
return existing == null;
58-
} catch (RuntimeException re) {
59-
if (logger.isDebugEnabled()) {
60-
logger.debugf(re, "Failed when adding code %s", codeId);
61-
}
62-
63-
// Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
64-
throw re;
53+
try {
54+
BasicCache<UUID, ActionTokenValueEntity> cache = codeCache.get();
55+
ActionTokenValueEntity existing = cache.putIfAbsent(codeId, tokenValue, lifespanInSeconds, TimeUnit.SECONDS);
56+
return existing == null;
57+
} catch (HotRodClientException re) {
58+
// No need to retry. The hotrod (remoteCache) has some retries in itself in case of some random network error happened.
59+
// In case of lock conflict, we don't want to retry anyway as there was likely an attempt to use the code from different place.
60+
if (logger.isDebugEnabled()) {
61+
logger.debugf(re, "Failed when adding code %s", codeId);
6562
}
6663

67-
}, 3, 0);
64+
return false;
65+
}
6866

69-
return codeAlreadyExists;
7067
}
7168

7269
@Override

model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/ClientListenerExecutorDecorator.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@
2121
import java.util.List;
2222
import java.util.Map;
2323
import java.util.concurrent.ExecutorService;
24+
import java.util.concurrent.RejectedExecutionException;
2425

2526
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
2627
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
2728
import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
2829
import org.infinispan.client.hotrod.event.ClientEvent;
2930
import org.jboss.logging.Logger;
3031
import org.keycloak.common.util.MultivaluedHashMap;
32+
import org.keycloak.common.util.Time;
3133

3234
import static org.infinispan.client.hotrod.event.ClientEvent.Type.CLIENT_CACHE_ENTRY_CREATED;
3335
import static org.infinispan.client.hotrod.event.ClientEvent.Type.CLIENT_CACHE_ENTRY_REMOVED;
@@ -94,24 +96,40 @@ private void submit(MyClientEvent event, Runnable r) {
9496

9597
// Assume it's called from the synchronized block
9698
private void submitImpl(K key, MyClientEvent event, Runnable r) {
97-
logger.debugf("Submitting event to the executor: %s", event.toString());
99+
logger.debugf("Submitting event to the executor: %s . eventsInProgress size: %d, eventsQueue size: %d", event.toString(), eventsInProgress.size(), eventsQueue.size());
98100

99101
eventsInProgress.put(key, event);
100102

101103
Runnable decoratedRunnable = () -> {
104+
Long start = null;
102105
try {
106+
if (logger.isDebugEnabled()) {
107+
start = Time.currentTimeMillis();
108+
}
109+
103110
r.run();
104111
} finally {
105112
synchronized (lock) {
106-
logger.debugf("Finished processing event by the executor: %s", event.toString());
107113
eventsInProgress.remove(key);
108114

115+
if (logger.isDebugEnabled()) {
116+
long took = Time.currentTimeMillis() - start;
117+
logger.debugf("Finished processing event by the executor: %s, took: %d ms. EventsInProgress size: %d", event.toString(), took, eventsInProgress.size());
118+
}
119+
109120
pollQueue(key);
110121
}
111122
}
112123
};
113124

114-
decorated.submit(decoratedRunnable);
125+
try {
126+
decorated.submit(decoratedRunnable);
127+
} catch (RejectedExecutionException ree) {
128+
eventsInProgress.remove(key);
129+
130+
logger.errorf("Rejected execution of task for the event '%s' . Try to increase the pool size. Pool is '%s'", event.toString(), decorated.toString());
131+
throw ree;
132+
}
115133
}
116134

117135

model/infinispan/src/main/java/org/keycloak/models/sessions/infinispan/remotestore/RemoteCacheInvoker.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.keycloak.models.sessions.infinispan.remotestore;
1919

20+
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
2021
import org.keycloak.common.util.Retry;
2122
import org.keycloak.common.util.Time;
2223
import java.util.Collections;
@@ -69,7 +70,9 @@ public <K, V extends SessionEntity> void runTask(KeycloakSession kcSession, Real
6970
SessionUpdateTask.CrossDCMessageStatus status = task.getCrossDCMessageStatus(sessionWrapper);
7071

7172
if (status == SessionUpdateTask.CrossDCMessageStatus.NOT_NEEDED) {
72-
logger.debugf("Skip writing to remoteCache for entity '%s' of cache '%s' and operation '%s'", key, cacheName, operation);
73+
if (logger.isTraceEnabled()) {
74+
logger.tracef("Skip writing to remoteCache for entity '%s' of cache '%s' and operation '%s'", key, cacheName, operation);
75+
}
7376
return;
7477
}
7578

@@ -78,23 +81,25 @@ public <K, V extends SessionEntity> void runTask(KeycloakSession kcSession, Real
7881
// Double the timeout to ensure that entry won't expire on remoteCache in case that write of some entities to remoteCache is postponed (eg. userSession.lastSessionRefresh)
7982
final long maxIdleTimeMs = loadedMaxIdleTimeMs * 2;
8083

81-
logger.debugf("Running task '%s' on remote cache '%s' . Key is '%s'", operation, cacheName, key);
84+
if (logger.isTraceEnabled()) {
85+
logger.tracef("Running task '%s' on remote cache '%s' . Key is '%s'", operation, cacheName, key);
86+
}
8287

83-
Retry.execute(() -> {
88+
Retry.executeWithBackoff((int iteration) -> {
8489

8590
try {
8691
runOnRemoteCache(context.remoteCache, maxIdleTimeMs, key, task, sessionWrapper);
87-
} catch (RuntimeException re) {
92+
} catch (HotRodClientException re) {
8893
if (logger.isDebugEnabled()) {
89-
logger.debugf(re, "Failed running task '%s' on remote cache '%s' . Key: '%s' . Will try to retry the task",
90-
operation, cacheName, key);
94+
logger.debugf(re, "Failed running task '%s' on remote cache '%s' . Key: '%s', iteration '%s'. Will try to retry the task",
95+
operation, cacheName, key, iteration);
9196
}
9297

9398
// Rethrow the exception. Retry will take care of handle the exception and eventually retry the operation.
9499
throw re;
95100
}
96101

97-
}, 10, 0);
102+
}, 10, 10);
98103
}
99104

100105

@@ -146,15 +151,17 @@ private <K, V extends SessionEntity> void replace(RemoteCache<K, SessionEntityWr
146151
// Run task on the remote session
147152
task.runUpdate(session);
148153

149-
logger.debugf("Before replaceWithVersion. Entity to write version %d: %s", versioned.getVersion(), session);
154+
if (logger.isTraceEnabled()) {
155+
logger.tracef("Before replaceWithVersion. Entity to write version %d: %s", versioned.getVersion(), session);
156+
}
150157

151158
replaced = remoteCache.replaceWithVersion(key, SessionEntityWrapper.forTransport(session), versioned.getVersion(), lifespanMs, TimeUnit.MILLISECONDS, maxIdleMs, TimeUnit.MILLISECONDS);
152159

153160
if (!replaced) {
154161
logger.debugf("Failed to replace entity '%s' version %d. Will retry again", key, versioned.getVersion());
155162
} else {
156-
if (logger.isDebugEnabled()) {
157-
logger.debugf("Replaced entity version %d in remote cache: %s", versioned.getVersion(), session);
163+
if (logger.isTraceEnabled()) {
164+
logger.tracef("Replaced entity version %d in remote cache: %s", versioned.getVersion(), session);
158165
}
159166
}
160167
}

services/src/main/java/org/keycloak/executors/DefaultExecutorsProviderFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class DefaultExecutorsProviderFactory implements ExecutorsProviderFactory
4545
protected static final Logger logger = Logger.getLogger(DefaultExecutorsProviderFactory.class);
4646

4747
private static final int DEFAULT_MIN_THREADS = 4;
48-
private static final int DEFAULT_MAX_THREADS = 16;
48+
private static final int DEFAULT_MAX_THREADS = 64;
4949

5050
private static final String MANAGED_EXECUTORS_SERVICE_JNDI_PREFIX = "java:jboss/ee/concurrency/executor/";
5151

0 commit comments

Comments
 (0)