public interface PublishStoreResizeHandler
Implementation Example:
public class MaxSizeResizeHandler implements PublishStoreResizeHandler, ConnectionStateListener { protected long maxSize; // Maximum allowed size for the publish store protected long timeout; // Timeout for store flushing protected int maxFlushAttempts; // Maximum number of attempts to flush the store protected volatile boolean connected = false; // Keeps track of the connection state public MaxSizeResizeHandler (long maxSize, long timeout, int maxFlushAttempts) { this.maxSize = maxSize; this.timeout = timeout; this.maxFlushAttempts = maxFlushAttempts; } @Overide public boolean invoke(Store store, long newSize) { try { if(newSize <= maxSize) return true; // Use your preferred logging API instead of println() calls. System.out.println("\n------- Resize Handler Called And Over Max Size --------"); long startCount=store.unpersistedCount(); System.out.println("Unpersisted count before flush: " + startCount); // We've reached max size. Try to flush store instead of growing it. long count=startCount; int tries=0; while(count > 0 && tries++ < maxFlushAttempts) { System.out.println("Store not empty. Unpersisted msg count = " + count); System.out.println("Flush attempt = " + tries + ", connected status = " + connected); try { store.flush(timeout); } catch(TimedOutException e) { /* ignore */ } count=store.unpersistedCount(); } long endCount=store.unpersistedCount(); System.out.println("Unpersisted count after flush attempt(s): " + endCount + ", connected = " + connected); } catch(Exception e) { e.printStackTrace(); } // Let the store know it can't grow. Hopefully space was freed up. return false; } @Overide public void connectionStateChanged(int state) { switch(state) { case ConnectionStateListener.Connected: case ConnectionStateListener.LoggedOn: case ConnectionStateListener.HeartbeatInitiated: case ConnectionStateListener.PublishReplayed: case ConnectionStateListener.Resubscribed: connected = true; break; case ConnectionStateListener.Disconnected: case ConnectionStateListener.Shutdown: connected = false; break; } } }
How to use your resize handler:
client = new HAClient("PubClient"); Store pubStore = new MemoryPublishStore(100); MaxSizeResizeHandler handler = new MaxSizeResizeHandler(maxSize, timeout, maxFlushAttempts); pubStore.setResizeHandler(handler); client.setPublishStore(pubStore); client.addConnectionStateListener(handler);
You can implement your custom resize handler by implementing the
PublishStoreResizeHandler
interface and then setting it using the
BlockPublishStore.setResizeHandler(PublishStoreResizeHandler)
method.
Modifier and Type | Method and Description |
---|---|
boolean |
invoke(Store store,
long size)
Called by the
Store when it wants to attempt a resize. |
boolean invoke(Store store, long size)
Store
when it wants to attempt a resize. Rather
than allow a resize request to grow the store, and implementation
could choose to flush the store (to allow space to be freed up)
at the expense of publishing performance. If the store's unpersisted
count doesn't decrease after a flush times out, it could indicate
the client is disconnected or a server-side issue such as a
synchronous replication destination is down.store
- The store sending the resize request.size
- The new size the store would like to resize to.