001package org.apache.commons.jcs3.auxiliary.remote; 002 003/* 004 * Licensed to the Apache Software Foundation (ASF) under one 005 * or more contributor license agreements. See the NOTICE file 006 * distributed with this work for additional information 007 * regarding copyright ownership. The ASF licenses this file 008 * to you under the Apache License, Version 2.0 (the 009 * "License"); you may not use this file except in compliance 010 * with the License. You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, 015 * software distributed under the License is distributed on an 016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 017 * KIND, either express or implied. See the License for the 018 * specific language governing permissions and limitations 019 * under the License. 020 */ 021 022import java.io.IOException; 023import java.rmi.Naming; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.ConcurrentMap; 026 027import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheAttributes; 028import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheClient; 029import org.apache.commons.jcs3.auxiliary.remote.behavior.IRemoteCacheListener; 030import org.apache.commons.jcs3.engine.CacheStatus; 031import org.apache.commons.jcs3.engine.CacheWatchRepairable; 032import org.apache.commons.jcs3.engine.ZombieCacheServiceNonLocal; 033import org.apache.commons.jcs3.engine.ZombieCacheWatch; 034import org.apache.commons.jcs3.engine.behavior.ICacheObserver; 035import org.apache.commons.jcs3.engine.behavior.ICacheServiceNonLocal; 036import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager; 037import org.apache.commons.jcs3.engine.behavior.IElementSerializer; 038import org.apache.commons.jcs3.engine.logging.behavior.ICacheEventLogger; 039import org.apache.commons.jcs3.log.Log; 040import org.apache.commons.jcs3.log.LogManager; 041 042/** 043 * An instance of RemoteCacheManager corresponds to one remote connection of a specific host and 044 * port. All RemoteCacheManager instances are monitored by the singleton RemoteCacheMonitor 045 * monitoring daemon for error detection and recovery. 046 * <p> 047 * Getting an instance of the remote cache has the effect of getting a handle on the remote server. 048 * Listeners are not registered with the server until a cache is requested from the manager. 049 */ 050public class RemoteCacheManager 051{ 052 /** The logger */ 053 private static final Log log = LogManager.getLog( RemoteCacheManager.class ); 054 055 /** Contains instances of RemoteCacheNoWait managed by a RemoteCacheManager instance. */ 056 private final ConcurrentMap<String, RemoteCacheNoWait<?, ?>> caches = 057 new ConcurrentHashMap<>(); 058 059 /** The event logger. */ 060 private final ICacheEventLogger cacheEventLogger; 061 062 /** The serializer. */ 063 private final IElementSerializer elementSerializer; 064 065 /** Handle to the remote cache service; or a zombie handle if failed to connect. */ 066 private ICacheServiceNonLocal<?, ?> remoteService; 067 068 /** 069 * Wrapper of the remote cache watch service; or wrapper of a zombie service if failed to 070 * connect. 071 */ 072 private final CacheWatchRepairable remoteWatch; 073 074 /** The cache manager listeners will need to use to get a cache. */ 075 private final ICompositeCacheManager cacheMgr; 076 077 /** For error notification */ 078 private final RemoteCacheMonitor monitor; 079 080 /** The service found through lookup */ 081 private final String registry; 082 083 /** can it be restored */ 084 private boolean canFix = true; 085 086 /** 087 * Constructs an instance to with the given remote connection parameters. If the connection 088 * cannot be made, "zombie" services will be temporarily used until a successful re-connection 089 * is made by the monitoring daemon. 090 * <p> 091 * @param cattr cache attributes 092 * @param cacheMgr the cache hub 093 * @param monitor the cache monitor thread for error notifications 094 * @param cacheEventLogger 095 * @param elementSerializer 096 */ 097 protected RemoteCacheManager( final IRemoteCacheAttributes cattr, final ICompositeCacheManager cacheMgr, 098 final RemoteCacheMonitor monitor, 099 final ICacheEventLogger cacheEventLogger, final IElementSerializer elementSerializer) 100 { 101 this.cacheMgr = cacheMgr; 102 this.monitor = monitor; 103 this.cacheEventLogger = cacheEventLogger; 104 this.elementSerializer = elementSerializer; 105 this.remoteWatch = new CacheWatchRepairable(); 106 107 this.registry = RemoteUtils.getNamingURL(cattr.getRemoteLocation(), cattr.getRemoteServiceName()); 108 109 try 110 { 111 lookupRemoteService(); 112 } 113 catch (final IOException e) 114 { 115 log.error("Could not find server", e); 116 // Notify the cache monitor about the error, and kick off the 117 // recovery process. 118 monitor.notifyError(); 119 } 120 } 121 122 /** 123 * Lookup remote service from registry 124 * @throws IOException if the remote service could not be found 125 * 126 */ 127 protected void lookupRemoteService() throws IOException 128 { 129 log.info( "Looking up server [{0}]", registry ); 130 try 131 { 132 final Object obj = Naming.lookup( registry ); 133 log.info( "Server found: {0}", obj ); 134 135 // Successful connection to the remote server. 136 this.remoteService = (ICacheServiceNonLocal<?, ?>) obj; 137 log.debug( "Remote Service = {0}", remoteService ); 138 remoteWatch.setCacheWatch( (ICacheObserver) remoteService ); 139 } 140 catch ( final Exception ex ) 141 { 142 // Failed to connect to the remote server. 143 // Configure this RemoteCacheManager instance to use the "zombie" 144 // services. 145 this.remoteService = new ZombieCacheServiceNonLocal<>(); 146 remoteWatch.setCacheWatch( new ZombieCacheWatch() ); 147 throw new IOException( "Problem finding server at [" + registry + "]", ex ); 148 } 149 } 150 151 /** 152 * Adds the remote cache listener to the underlying cache-watch service. 153 * <p> 154 * @param cattr The feature to be added to the RemoteCacheListener attribute 155 * @param listener The feature to be added to the RemoteCacheListener attribute 156 * @throws IOException 157 */ 158 public <K, V> void addRemoteCacheListener( final IRemoteCacheAttributes cattr, final IRemoteCacheListener<K, V> listener ) 159 throws IOException 160 { 161 if ( cattr.isReceive() ) 162 { 163 log.info( "The remote cache is configured to receive events from the remote server. " 164 + "We will register a listener. remoteWatch = {0} | IRemoteCacheListener = {1}" 165 + " | cacheName ", remoteWatch, listener, cattr.getCacheName() ); 166 167 remoteWatch.addCacheListener( cattr.getCacheName(), listener ); 168 } 169 else 170 { 171 log.info( "The remote cache is configured to NOT receive events from the remote server. " 172 + "We will NOT register a listener." ); 173 } 174 } 175 176 /** 177 * Removes a listener. When the primary recovers the failover must deregister itself for a 178 * region. The failover runner will call this method to de-register. We do not want to deregister 179 * all listeners to a remote server, in case a failover is a primary of another region. Having 180 * one regions failover act as another servers primary is not currently supported. 181 * <p> 182 * @param cattr 183 * @throws IOException 184 */ 185 public void removeRemoteCacheListener( final IRemoteCacheAttributes cattr ) 186 throws IOException 187 { 188 final RemoteCacheNoWait<?, ?> cache = caches.get( cattr.getCacheName() ); 189 if ( cache != null ) 190 { 191 removeListenerFromCache(cache); 192 } 193 else 194 { 195 if ( cattr.isReceive() ) 196 { 197 log.warn( "Trying to deregister Cache Listener that was never registered." ); 198 } 199 else 200 { 201 log.debug( "Since the remote cache is configured to not receive, " 202 + "there is no listener to deregister." ); 203 } 204 } 205 } 206 207 // common helper method 208 private void removeListenerFromCache(final RemoteCacheNoWait<?, ?> cache) throws IOException 209 { 210 final IRemoteCacheClient<?, ?> rc = cache.getRemoteCache(); 211 log.debug( "Found cache for [{0}], deregistering listener.", cache::getCacheName); 212 // could also store the listener for a server in the manager. 213 remoteWatch.removeCacheListener(cache.getCacheName(), rc.getListener()); 214 } 215 216 /** 217 * Gets a RemoteCacheNoWait from the RemoteCacheManager. The RemoteCacheNoWait objects are 218 * identified by the cache name value of the RemoteCacheAttributes object. 219 * <p> 220 * If the client is configured to register a listener, this call results on a listener being 221 * created if one isn't already registered with the remote cache for this region. 222 * <p> 223 * @param cattr 224 * @return The cache value 225 */ 226 @SuppressWarnings("unchecked") // Need to cast because of common map for all caches 227 public <K, V> RemoteCacheNoWait<K, V> getCache( final IRemoteCacheAttributes cattr ) 228 { 229 // might want to do some listener sanity checking here. 230 return (RemoteCacheNoWait<K, V>) caches.computeIfAbsent(cattr.getCacheName(), 231 key -> newRemoteCacheNoWait(cattr)); 232 } 233 234 /** 235 * Create new RemoteCacheNoWait instance 236 * 237 * @param cattr the cache configuration 238 * @return the instance 239 */ 240 protected <K, V> RemoteCacheNoWait<K, V> newRemoteCacheNoWait(final IRemoteCacheAttributes cattr) 241 { 242 final RemoteCacheNoWait<K, V> remoteCacheNoWait; 243 // create a listener first and pass it to the remotecache 244 // sender. 245 RemoteCacheListener<K, V> listener = null; 246 try 247 { 248 listener = new RemoteCacheListener<>( cattr, cacheMgr, elementSerializer ); 249 addRemoteCacheListener( cattr, listener ); 250 } 251 catch ( final IOException e ) 252 { 253 log.error( "Problem adding listener. RemoteCacheListener = {0}", 254 listener, e ); 255 } 256 257 @SuppressWarnings("unchecked") 258 final IRemoteCacheClient<K, V> remoteCacheClient = 259 new RemoteCache<>(cattr, (ICacheServiceNonLocal<K, V>) remoteService, listener, monitor); 260 remoteCacheClient.setCacheEventLogger( cacheEventLogger ); 261 remoteCacheClient.setElementSerializer( elementSerializer ); 262 263 remoteCacheNoWait = new RemoteCacheNoWait<>( remoteCacheClient ); 264 remoteCacheNoWait.setCacheEventLogger( cacheEventLogger ); 265 remoteCacheNoWait.setElementSerializer( elementSerializer ); 266 267 return remoteCacheNoWait; 268 } 269 270 /** Shutdown all. */ 271 public void release() 272 { 273 caches.forEach((name, cache) -> { 274 try 275 { 276 log.info("freeCache [{0}]", name); 277 278 removeListenerFromCache(cache); 279 cache.dispose(); 280 } 281 catch ( final IOException ex ) 282 { 283 log.error("Problem releasing {0}", name, ex); 284 } 285 }); 286 caches.clear(); 287 } 288 289 /** 290 * Fixes up all the caches managed by this cache manager. 291 */ 292 public void fixCaches() 293 { 294 if ( !canFix ) 295 { 296 return; 297 } 298 299 log.info( "Fixing caches. ICacheServiceNonLocal {0} | IRemoteCacheObserver {1}", 300 remoteService, remoteWatch ); 301 302 caches.values().stream() 303 .filter(cache -> cache.getStatus() == CacheStatus.ERROR) 304 .forEach(cache -> cache.fixCache(remoteService)); 305 306 if ( log.isInfoEnabled() ) 307 { 308 final String msg = "Remote connection to " + registry + " resumed."; 309 if ( cacheEventLogger != null ) 310 { 311 cacheEventLogger.logApplicationEvent( "RemoteCacheManager", "fix", msg ); 312 } 313 log.info( msg ); 314 } 315 } 316 317 /** 318 * Returns true if the connection to the remote host can be 319 * successfully re-established. 320 * <p> 321 * @return true if we found a failover server 322 */ 323 public boolean canFixCaches() 324 { 325 try 326 { 327 lookupRemoteService(); 328 } 329 catch (final IOException e) 330 { 331 log.error("Could not find server", e); 332 canFix = false; 333 } 334 335 return canFix; 336 } 337}