MetalamaConceptual documentationUsing Metalama PatternsCachingSynchronizing local in-memory caches for multiple nodes
Open sandboxFocusImprove this doc

Synchronizing local in-memory caches for multiple servers

Caching in distributed applications can pose a complex problem. When multiple instances of an application are running simultaneously (typically websites or web services deployed in the cloud or web farms), it's crucial to ensure that the cache is appropriately invalidated for all application instances.

A common solution to this issue is the use of a centralized cache server (or a cluster of cache servers), such as a Redis server or a Redis cluster. However, operating a cache server or cluster incurs a cost, which may not always be justified for medium-sized applications, such as a small business website.

An alternative solution to distributed caching is to maintain a local in-memory cache in each application instance. Instead of using a shared distributed cache, each application instance caches its data into its local cache. However, when one application instance modifies a piece of data, it must ensure that all instances remove the relevant items from their local cache. This process is known as distributed cache invalidation. It can be achieved easily and inexpensively with a publish/subscribe (Pub/Sub) message bus, which is much less costly than a cache cluster.

Metalama facilitates the easy addition of pub/sub cache invalidation to your existing Metalama caching using either Azure Service Bus or Redis Pub/Sub.

Warning

With pub/sub invalidation, there may be some latency in the invalidation mechanism, i.e., different application instances running on different servers may see different data for a few dozen milliseconds. While generally harmless when application clients are affinitized to one server (for instance, with geo-based request routing), it can cause issues when the same client can randomly connect to different servers.

Using Azure Service Bus

Configuring a topic

The first step is to create a topic. To do this using the Microsoft Azure portal, follow these steps:

  1. Navigate to the Microsoft Azure portal, open the Service Bus panel and create a new Topic. Choose a small value for the time-to-live setting, such as 30 seconds. Visit the Microsoft Azure website for more details.

  2. In the Microsoft Azure portal, create a Shared access policy and include the Send, Listen, and Manage rights. Your application will use this policy.

  3. Copy the primary or secondary connection string to your clipboard.

Configuring your application

  1. Add caching to your application as described in Getting started with Metalama Caching.

  2. Add a reference to the Metalama.Patterns.Caching.Backends.Azure NuGet package.

  3. Return to the code that initialized the Metalama Caching by calling serviceCollection.AddMetalamaCaching or CachingService.Create. Call the WithBackend method and supply a delegate that calls the Memory method. Then, call WithAzureSynchronization and pass the topic connection string as an argument.

    30            builder.Services.AddMetalamaCaching(
    31                caching =>
    32                    caching.WithBackend(
    33                        backend =>
    34                            backend.Memory().WithAzureSynchronization( connectionString ) ) );
  4. We recommend initializing the caching service during your application's initialization sequence; otherwise, the service will be initialized lazily upon its first use. Retrieve the ICachingService interface from the <xref:System.IServiceProvider> and call the <xref"Metalama.Patterns.Caching.ICachingService.InitializeAsync*> method.

    43            await app.Services.GetRequiredService<ICachingService>().InitializeAsync();
Warning

Ensure that the ICachingService is properly disposed of before the application exits. Failure to do so may leave some background cache write operations unprocessed, leading to cache inconsistency.

Example: A Distributed Application Synchronized by Azure Service Bus

The following example simulates a multi-instance application. For ease of testing, both instances live in the same process. Both instances read and write to a shared database simulated by a concurrent dictionary, which sits behind an in-memory cache. These two cache instances are synchronized using WithAzureSynchronization.

1using System;
2using Metalama.Documentation.Helpers.ConsoleApp;
3using System.Threading;
4using System.Threading.Tasks;
5
6namespace Doc.AzureSynchronized
7{
8    public sealed class ConsoleMain : IAsyncConsoleMain
9    {
10        private readonly ProductCatalogue _productCatalogue;
11        private readonly string _appName;
12
13        public ConsoleMain( ProductCatalogue productCatalogue, IConsoleHost host )
14        {
15            this._productCatalogue = productCatalogue;
16            this._appName = host.Arguments[0];
17        }
18
19        public async Task ExecuteAsync()
20        {
21            // Force running in parallel.
22            await Task.Yield();
23
24            for ( var i = 0; i < 3; i++ )
25            {
26                for ( var j = 0; j < 3; j++ )
27                {
28                    // Getting the product.
29                    var corn = this._productCatalogue.GetProduct( "corn" );
30                    Console.WriteLine( $"{this._appName} reads and gets {corn}." );
31                    await Task.Delay( 20 + Random.Shared.Next( 4 ) );
32                }
33
34                // Updating the product.
35                var updatedCorn = new Product( "corn", 100 + Random.Shared.Next( 20 ), $"Updated by {this._appName}, i={i}" );
36                Console.WriteLine( $"{this._appName} update {updatedCorn}." );
37                this._productCatalogue.Update( updatedCorn );
38            }
39
40            Console.WriteLine( $"In total, CloudCalculator in {this._appName} performed {this._productCatalogue.DbOperationCount} database operation(s)." );
41        }
42    }
43}
1using Metalama.Documentation.Helpers.ConsoleApp;
2using Metalama.Documentation.Helpers.Security;
3using Metalama.Patterns.Caching;
4using Metalama.Patterns.Caching.Backends.Azure;
5using Metalama.Patterns.Caching.Building;
6using Microsoft.Extensions.DependencyInjection;
7using System.Threading.Tasks;
8
9namespace Doc.AzureSynchronized
10{
11    internal static class Program
12    {
13        public static async Task Main()
14        {
15            // We simulate two applications running in parallel.
16            var app1 = RunApp( "App1" );
17            var app2 = RunApp( "App2" );
18
19            await Task.WhenAll( app1, app2 );
20        }
21
22        private static async Task RunApp( string name )
23        {
24            var builder = ConsoleApp.CreateBuilder();
25
26            // Get the connection string.
27            var connectionString = Secrets.Get( "CacheInvalidationTestServiceBusConnectionString" );
28
29            // Add the caching service.
30            builder.Services.AddMetalamaCaching(
31                caching =>
32                    caching.WithBackend(
33                        backend =>
34                            backend.Memory().WithAzureSynchronization( connectionString ) ) );
35
36            // Add other components as usual.
37            builder.Services.AddAsyncConsoleMain<ConsoleMain>();
38            builder.Services.AddSingleton<ProductCatalogue>();
39
40            // Build the application.
41            await using var app = builder.Build( new[] { name } );
42
43            await app.Services.GetRequiredService<ICachingService>().InitializeAsync();
44
45
46            // Run the application.
47            await app.RunAsync();
48        }
49    }
50}
Source Code
1using Metalama.Patterns.Caching.Aspects;
2using System;
3using System.Collections.Concurrent;
4using System.Collections.Generic;
5
6using Metalama.Patterns.Caching;
7
8namespace Doc.AzureSynchronized
9{


10    public record Product( string Id, decimal Price, string? Remarks = null );
11
12    public sealed class ProductCatalogue
13    {
14        // This instance is intentionally shared between both app instances to simulate
15        // a shared database.
16        private static readonly ConcurrentDictionary<string, Product> _dbSimulator
17            = new() { ["corn"] = new Product( "corn", 100, "Initial record." ) };
18
19        public int DbOperationCount { get; private set; }
20
21        [Cache]
22        public Product GetProduct( string productId )
23        {
24            Console.WriteLine( $"Getting the product of {productId} from database." );
25
26            this.DbOperationCount++;










27
28            return _dbSimulator[productId];
29        }
30
31        public void Update( Product product )
32        {
33            if ( !_dbSimulator.ContainsKey( product.Id ) )
34            {
35                throw new KeyNotFoundException();
36            }
37
38            Console.WriteLine( $"Updating the product {product.Id}." );
39
40            this.DbOperationCount++;
41            _dbSimulator[product.Id] = product;
42
            Error CS1061: 'ProductCatalogue' does not contain a definition for '_cachingService' and no accessible extension method '_cachingService' accepting a first argument of type 'ProductCatalogue' could be found (are you missing a using directive or an assembly reference?)

43            this._cachingService.Invalidate( this.GetProduct, product.Id );
44        }






45    }
46}
Transformed Code
1using Metalama.Patterns.Caching.Aspects;
2using System;
3using System.Collections.Concurrent;
4using System.Collections.Generic;
5
6using Metalama.Patterns.Caching;
7using Metalama.Patterns.Caching.Aspects.Helpers;
8using System.Reflection;
9
10namespace Doc.AzureSynchronized
11{
12    public record Product(string Id, decimal Price, string? Remarks = null);
13
14    public sealed class ProductCatalogue
15    {
16        // This instance is intentionally shared between both app instances to simulate
17        // a shared database.
18        private static readonly ConcurrentDictionary<string, Product> _dbSimulator
19            = new() { ["corn"] = new Product("corn", 100, "Initial record.") };
20
21        public int DbOperationCount { get; private set; }
22
23        [Cache]
24        public Product GetProduct(string productId)
25        {
26            static object? Invoke(object? instance, object?[] args)
27            {
28                return ((ProductCatalogue)instance).GetProduct_Source((string)args[0]);
29            }
30
31            return _cachingService!.GetFromCacheOrExecute<Product>(_cacheRegistration_GetProduct!, this, new object[] { productId }, Invoke);
32        }
33
34        private Product GetProduct_Source(string productId)
35        {
36            Console.WriteLine($"Getting the product of {productId} from database.");
37
38            this.DbOperationCount++;
39
40            return _dbSimulator[productId];
41        }
42
43        public void Update(Product product)
44        {
45            if (!_dbSimulator.ContainsKey(product.Id))
46            {
47                throw new KeyNotFoundException();
48            }
49
50            Console.WriteLine($"Updating the product {product.Id}.");
51
52            this.DbOperationCount++;
53            _dbSimulator[product.Id] = product;
54
55            this._cachingService.Invalidate(this.GetProduct, product.Id);
56        }
57
58        private static readonly CachedMethodMetadata _cacheRegistration_GetProduct;
59        private ICachingService _cachingService;
60
61        static ProductCatalogue
62        ()
63        {
64            ProductCatalogue._cacheRegistration_GetProduct = CachedMethodMetadata.Register(RunTimeHelpers.ThrowIfMissing(typeof(ProductCatalogue).GetMethod("GetProduct", BindingFlags.Public | BindingFlags.Instance, null, new[] { typeof(string) }, null)!, "ProductCatalogue.GetProduct(string)"), new CachedMethodConfiguration() { AbsoluteExpiration = null, AutoReload = null, IgnoreThisParameter = null, Priority = null, ProfileName = (string?)null, SlidingExpiration = null }, true);
65        }
66
67        public ProductCatalogue
68        (ICachingService? cachingService = default)
69        {
70            this._cachingService = cachingService ?? throw new System.ArgumentNullException(nameof(cachingService));
71        }
72    }
73}
Getting the product of corn from database.
App2 reads and gets Product { Id = corn, Price = 100, Remarks = Initial record. }.
Getting the product of corn from database.
App1 reads and gets Product { Id = corn, Price = 100, Remarks = Initial record. }.
App2 reads and gets Product { Id = corn, Price = 100, Remarks = Initial record. }.
App1 reads and gets Product { Id = corn, Price = 100, Remarks = Initial record. }.
App2 reads and gets Product { Id = corn, Price = 100, Remarks = Initial record. }.
App1 reads and gets Product { Id = corn, Price = 100, Remarks = Initial record. }.
App2 update Product { Id = corn, Price = 102, Remarks = Updated by App2, i=0 }.
Updating the product corn.
Getting the product of corn from database.
App2 reads and gets Product { Id = corn, Price = 102, Remarks = Updated by App2, i=0 }.
App1 update Product { Id = corn, Price = 105, Remarks = Updated by App1, i=0 }.
Updating the product corn.
Getting the product of corn from database.
App1 reads and gets Product { Id = corn, Price = 105, Remarks = Updated by App1, i=0 }.
App2 reads and gets Product { Id = corn, Price = 102, Remarks = Updated by App2, i=0 }.
App1 reads and gets Product { Id = corn, Price = 105, Remarks = Updated by App1, i=0 }.
App2 reads and gets Product { Id = corn, Price = 102, Remarks = Updated by App2, i=0 }.
App1 reads and gets Product { Id = corn, Price = 105, Remarks = Updated by App1, i=0 }.
App2 update Product { Id = corn, Price = 116, Remarks = Updated by App2, i=1 }.
Updating the product corn.
Getting the product of corn from database.
App2 reads and gets Product { Id = corn, Price = 116, Remarks = Updated by App2, i=1 }.
App1 update Product { Id = corn, Price = 103, Remarks = Updated by App1, i=1 }.
Updating the product corn.
Getting the product of corn from database.
App1 reads and gets Product { Id = corn, Price = 103, Remarks = Updated by App1, i=1 }.
App2 reads and gets Product { Id = corn, Price = 116, Remarks = Updated by App2, i=1 }.
App1 reads and gets Product { Id = corn, Price = 103, Remarks = Updated by App1, i=1 }.
App2 reads and gets Product { Id = corn, Price = 116, Remarks = Updated by App2, i=1 }.
App1 reads and gets Product { Id = corn, Price = 103, Remarks = Updated by App1, i=1 }.
App2 update Product { Id = corn, Price = 106, Remarks = Updated by App2, i=2 }.
Updating the product corn.
In total, CloudCalculator in App2 performed 6 database operation(s).
App1 update Product { Id = corn, Price = 113, Remarks = Updated by App1, i=2 }.
Updating the product corn.
In total, CloudCalculator in App1 performed 6 database operation(s).

Using Redis Pub/Sub

If you are already using Redis as a storage for Metalama Caching, adding another layer of invalidation is unnecessary as this is already handled by the Redis caching back-end. However, if you already have a Redis cluster but don't want to use it for caching, you can still use it for cache invalidation. An example of this situation is when your Redis server's latency is too high for caching but sufficient for cache invalidation.

No configuration on your Redis server is necessary to use it for cache synchronization.

  1. Add caching to your application as described in Getting started with Metalama Caching.
  2. Add a reference to the Metalama.Patterns.Caching.Backends.Redis NuGet package.

  3. Return to the code that initialized the Metalama Caching by calling serviceCollection.AddMetalamaCaching or CachingService.Create. Call the WithBackend method and supply a delegate that calls the Memory method. Then, call WithRedisSynchronization and pass an instance of RedisCacheSynchronizerConfiguration.

  4. We recommend initializing the caching service during your application's initialization sequence; otherwise, the service will be initialized lazily upon its first use. Retrieve the ICachingService interface from the <xref:System.IServiceProvider> and call the <xref"Metalama.Patterns.Caching.ICachingService.InitializeAsync*> method.

Warning

Ensure that the ICachingService is properly disposed of before the application exits. Failure to do so may leave some background cache write operations unprocessed, leading to cache inconsistency.