diff --git a/src/OpenFeature.Contrib.Providers.Flagd/FlagdConfig.cs b/src/OpenFeature.Contrib.Providers.Flagd/FlagdConfig.cs index 42da879f7..22f005aae 100644 --- a/src/OpenFeature.Contrib.Providers.Flagd/FlagdConfig.cs +++ b/src/OpenFeature.Contrib.Providers.Flagd/FlagdConfig.cs @@ -18,7 +18,13 @@ public enum ResolverType /// locally for in-process evaluation. /// Evaluations are preformed in-process. /// - IN_PROCESS + IN_PROCESS, + /// + /// This is the in-process offline mode resolving type, where flags are fetched from a file and stored + /// locally for in-process evaluation. + /// Evaluations are preformed in-process. + /// + FILE, } /// @@ -147,6 +153,16 @@ public string SourceSelector set => _sourceSelector = value; } + /// + /// File source of flags to be used by offline mode. + /// Provide full path including directory and file name. + /// + public string OfflineFlagSourceFullPath + { + get => this._offlineFlagSourceFullPath; + set => this._offlineFlagSourceFullPath = value; + } + internal bool UseCertificate => _cert.Length > 0; private string _host; @@ -158,6 +174,7 @@ public string SourceSelector private int _maxCacheSize; private int _maxEventStreamRetries; private string _sourceSelector; + private string _offlineFlagSourceFullPath; private ResolverType _resolverType; internal FlagdConfig() @@ -327,6 +344,16 @@ public FlagdConfigBuilder WithSourceSelector(string sourceSelector) return this; } + /// + /// File source of flags to be used by offline mode. + /// Provide full path including directory and file name. + /// + public FlagdConfigBuilder OfflineFlagSourceFullPath(string path) + { + _config.OfflineFlagSourceFullPath = path; + return this; + } + /// /// Builds the FlagdConfig object. /// diff --git a/src/OpenFeature.Contrib.Providers.Flagd/FlagdProvider.cs b/src/OpenFeature.Contrib.Providers.Flagd/FlagdProvider.cs index 840f4df4d..da018979d 100644 --- a/src/OpenFeature.Contrib.Providers.Flagd/FlagdProvider.cs +++ b/src/OpenFeature.Contrib.Providers.Flagd/FlagdProvider.cs @@ -65,14 +65,11 @@ public FlagdProvider(FlagdConfig config) _config = config; - if (_config.ResolverType == ResolverType.IN_PROCESS) + _resolver = _config.ResolverType switch { - _resolver = new InProcessResolver(_config, EventChannel, _providerMetadata); - } - else - { - _resolver = new RpcResolver(config, EventChannel, _providerMetadata); - } + ResolverType.RPC => new RpcResolver(config, EventChannel, _providerMetadata), + _ => new InProcessResolver(_config, EventChannel, _providerMetadata) + }; } // just for testing, internal but visible in tests diff --git a/src/OpenFeature.Contrib.Providers.Flagd/Resolver/InProcess/InProcessResolver.cs b/src/OpenFeature.Contrib.Providers.Flagd/Resolver/InProcess/InProcessResolver.cs index 57c829797..9a9ff711c 100644 --- a/src/OpenFeature.Contrib.Providers.Flagd/Resolver/InProcess/InProcessResolver.cs +++ b/src/OpenFeature.Contrib.Providers.Flagd/Resolver/InProcess/InProcessResolver.cs @@ -1,82 +1,70 @@ -using System.Net.Http; using System.Threading.Tasks; using OpenFeature.Model; using OpenFeature.Flagd.Grpc.Sync; using System; -using System.IO; -#if NET462_OR_GREATER -using System.Linq; -using System.Net.Security; -#endif -using System.Security.Cryptography.X509Certificates; -using Grpc.Net.Client; -#if NET8_0_OR_GREATER -using System.Net.Sockets; // needed for unix sockets -#endif using System.Threading; -using Grpc.Core; using Value = OpenFeature.Model.Value; using System.Threading.Channels; using OpenFeature.Constant; +using OpenFeature.Contrib.Providers.Flagd.Resolver.InProcess.Storage; namespace OpenFeature.Contrib.Providers.Flagd.Resolver.InProcess; internal class InProcessResolver : Resolver { - static readonly int InitialEventStreamRetryBaseBackoff = 1; - static readonly int MaxEventStreamRetryBackoff = 60; readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); - private readonly FlagSyncService.FlagSyncServiceClient _client; + private readonly Storage.Storage _storage; private readonly JsonEvaluator _evaluator; - private readonly Mutex _mtx; - private int _eventStreamRetryBackoff = InitialEventStreamRetryBaseBackoff; - private readonly FlagdConfig _config; private Thread _handleEventsThread; - private GrpcChannel _channel; private Channel _eventChannel; private Model.Metadata _providerMetadata; - private bool connected = false; internal InProcessResolver(FlagdConfig config, Channel eventChannel, Model.Metadata providerMetadata) { _eventChannel = eventChannel; _providerMetadata = providerMetadata; - _config = config; - _client = BuildClient(config, channel => new FlagSyncService.FlagSyncServiceClient(channel)); - _mtx = new Mutex(); _evaluator = new JsonEvaluator(config.SourceSelector); + this._storage = config.ResolverType switch + { + ResolverType.FILE => new FileStorage(config), + _ => new RpcStorage(config), + }; } internal InProcessResolver(FlagSyncService.FlagSyncServiceClient client, FlagdConfig config, Channel eventChannel, Model.Metadata providerMetadata) : this(config, eventChannel, providerMetadata) { - _client = client; + this._storage = new RpcStorage(client, config); } public Task Init() { - return Task.Run(() => + Task.Run(() => { var latch = new CountdownEvent(1); - _handleEventsThread = new Thread(async () => await HandleEvents(latch).ConfigureAwait(false)) + _handleEventsThread = new Thread(async () => + { + await _storage.Init().ConfigureAwait(false); + await HandleStorageEvents(latch).ConfigureAwait(false); + }) { IsBackground = true }; _handleEventsThread.Start(); latch.Wait(); - }).ContinueWith((task) => + }).ContinueWith(task => { if (task.IsFaulted) throw task.Exception; }); + return Task.CompletedTask; } public Task Shutdown() { _cancellationTokenSource.Cancel(); - return _channel?.ShutdownAsync().ContinueWith((t) => + return this._storage.Shutdown().ContinueWith(t => { - _channel.Dispose(); if (t.IsFaulted) throw t.Exception; - }); + });; } public Task> ResolveBooleanValueAsync(string flagKey, bool defaultValue, EvaluationContext context = null) @@ -104,230 +92,51 @@ public Task> ResolveStructureValueAsync(string flagKey, return Task.FromResult(_evaluator.ResolveStructureValueAsync(flagKey, defaultValue, context)); } - private async Task HandleEvents(CountdownEvent latch) + private async Task HandleStorageEvents(CountdownEvent latch) { CancellationToken token = _cancellationTokenSource.Token; - while (!token.IsCancellationRequested) + while (await this._storage.EventChannel().Reader.WaitToReadAsync(token).ConfigureAwait(false)) { - var call = _client.SyncFlags(new SyncFlagsRequest - { - Selector = _config.SourceSelector - }); try { - // Read the response stream asynchronously - while (!token.IsCancellationRequested && await call.ResponseStream.MoveNext(token).ConfigureAwait(false)) - { - var response = call.ResponseStream.Current; - _evaluator.Sync(FlagConfigurationUpdateType.ALL, response.FlagConfiguration); - if (!latch.IsSet) - { - latch.Signal(); - } - HandleProviderReadyEvent(); - HandleProviderChangeEvent(); - } - } - catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled) - { - // do nothing, we've been shutdown - } - catch (RpcException) - { - // Handle the dropped connection by reconnecting and retrying the stream - await HandleErrorEvent().ConfigureAwait(false); - } - } - } - - private void HandleProviderReadyEvent() - { - _mtx.WaitOne(); - _eventStreamRetryBackoff = InitialEventStreamRetryBaseBackoff; - if (!connected) - { - connected = true; - _eventChannel.Writer.TryWrite(new ProviderEventPayload { Type = ProviderEventTypes.ProviderReady, ProviderName = _providerMetadata.Name }); - } - _mtx.ReleaseMutex(); - } - - private async Task HandleErrorEvent() - { - _mtx.WaitOne(); - _eventStreamRetryBackoff = Math.Min(_eventStreamRetryBackoff * 2, MaxEventStreamRetryBackoff); - if (connected) - { - connected = false; - _eventChannel.Writer.TryWrite(new ProviderEventPayload { Type = ProviderEventTypes.ProviderError, ProviderName = _providerMetadata.Name }); - } - _mtx.ReleaseMutex(); - await Task.Delay(_eventStreamRetryBackoff * 1000).ConfigureAwait(false); - } - - private void HandleProviderChangeEvent() - { - _mtx.WaitOne(); - if (connected) - { - _eventChannel.Writer.TryWrite(new ProviderEventPayload { Type = ProviderEventTypes.ProviderConfigurationChanged, ProviderName = _providerMetadata.Name }); - } - _mtx.ReleaseMutex(); - } - - private T BuildClient(FlagdConfig config, Func constructorFunc) - { - var useUnixSocket = config.GetUri().ToString().StartsWith("unix://"); - - if (!useUnixSocket) - { -#if NET462_OR_GREATER - var handler = new WinHttpHandler(); -#else - var handler = new HttpClientHandler(); -#endif - if (config.UseCertificate) - { - if (File.Exists(config.CertificatePath)) - { - X509Certificate2 certificate = new X509Certificate2(config.CertificatePath); -#if NET5_0_OR_GREATER - handler.ServerCertificateCustomValidationCallback = (message, cert, chain, _) => { - // the the custom cert to the chain, Build returns a bool if valid. - chain.ChainPolicy.TrustMode = X509ChainTrustMode.CustomRootTrust; - chain.ChainPolicy.CustomTrustStore.Add(certificate); - return chain.Build(cert); - }; -#elif NET462_OR_GREATER - handler.ServerCertificateValidationCallback = (message, cert, chain, errors) => - { - if (errors == SslPolicyErrors.None) { return true; } - - chain.ChainPolicy.VerificationFlags = X509VerificationFlags.AllowUnknownCertificateAuthority; - - chain.ChainPolicy.ExtraStore.Add(certificate); - - var isChainValid = chain.Build(cert); - - if (!isChainValid) { return false; } - - var isValid = chain.ChainElements - .Cast() - .Any(x => x.Certificate.RawData.SequenceEqual(certificate.GetRawCertData())); - - return isValid; - }; -#else - throw new ArgumentException("Custom Certificates are not supported on your platform"); -#endif - } - else + var storageEvent = await this._storage.EventChannel().Reader.ReadAsync().ConfigureAwait(false); + switch (storageEvent.EventType) { - throw new ArgumentException("Specified certificate cannot be found."); + case StorageEvent.Type.READY: + _evaluator.Sync(FlagConfigurationUpdateType.ALL, storageEvent.FlagConfiguration); + if (!latch.IsSet) + { + _eventChannel.Writer.TryWrite(new ProviderEventPayload + { + Type = ProviderEventTypes.ProviderReady, ProviderName = _providerMetadata.Name + }); + latch.Signal(); + } + + break; + case StorageEvent.Type.CHANGED: + _evaluator.Sync(FlagConfigurationUpdateType.ALL, storageEvent.FlagConfiguration); + _eventChannel.Writer.TryWrite(new ProviderEventPayload + { + Type = ProviderEventTypes.ProviderConfigurationChanged, + ProviderName = _providerMetadata.Name + }); + break; + case StorageEvent.Type.ERROR: + _eventChannel.Writer.TryWrite(new ProviderEventPayload + { + Type = ProviderEventTypes.ProviderError, ProviderName = _providerMetadata.Name + }); + break; } } - _channel = GrpcChannel.ForAddress(config.GetUri(), new GrpcChannelOptions - { - HttpHandler = handler - }); - return constructorFunc(_channel); - - } - -#if NET5_0_OR_GREATER - var udsEndPoint = new UnixDomainSocketEndPoint(config.GetUri().ToString().Substring("unix://".Length)); - var connectionFactory = new UnixDomainSocketConnectionFactory(udsEndPoint); - var socketsHttpHandler = new SocketsHttpHandler - { - ConnectCallback = connectionFactory.ConnectAsync - }; - - // point to localhost and let the custom ConnectCallback handle the communication over the unix socket - // see https://learn.microsoft.com/en-us/aspnet/core/grpc/interprocess-uds?view=aspnetcore-7.0 for more details - _channel = GrpcChannel.ForAddress("http://localhost", new GrpcChannelOptions - { - HttpHandler = socketsHttpHandler, - }); - return constructorFunc(_channel); -#endif - // unix socket support is not available in this dotnet version - throw new Exception("unix sockets are not supported in this version."); - } - - private FlagSyncService.FlagSyncServiceClient BuildClientForPlatform(FlagdConfig config) - { - var useUnixSocket = config.GetUri().ToString().StartsWith("unix://"); - - if (!useUnixSocket) - { -#if NET462_OR_GREATER - var handler = new WinHttpHandler(); -#else - var handler = new HttpClientHandler(); -#endif - if (config.UseCertificate) + catch (Exception) { - if (File.Exists(config.CertificatePath)) - { - X509Certificate2 certificate = new X509Certificate2(config.CertificatePath); -#if NET5_0_OR_GREATER - handler.ServerCertificateCustomValidationCallback = (message, cert, chain, _) => { - // the the custom cert to the chain, Build returns a bool if valid. - chain.ChainPolicy.TrustMode = X509ChainTrustMode.CustomRootTrust; - chain.ChainPolicy.CustomTrustStore.Add(certificate); - return chain.Build(cert); - }; -#elif NET462_OR_GREATER - handler.ServerCertificateValidationCallback = (message, cert, chain, errors) => - { - if (errors == SslPolicyErrors.None) { return true; } - - chain.ChainPolicy.VerificationFlags = X509VerificationFlags.AllowUnknownCertificateAuthority; - - chain.ChainPolicy.ExtraStore.Add(certificate); - - var isChainValid = chain.Build(cert); - - if (!isChainValid) { return false; } - - var isValid = chain.ChainElements - .Cast() - .Any(x => x.Certificate.RawData.SequenceEqual(certificate.GetRawCertData())); - - return isValid; - }; -#else - throw new ArgumentException("Custom Certificates are not supported on your platform"); -#endif - } - else + _eventChannel.Writer.TryWrite(new ProviderEventPayload { - throw new ArgumentException("Specified certificate cannot be found."); - } + Type = ProviderEventTypes.ProviderError, ProviderName = _providerMetadata.Name + }); } - return new FlagSyncService.FlagSyncServiceClient(GrpcChannel.ForAddress(config.GetUri(), new GrpcChannelOptions - { - HttpHandler = handler - })); } - -#if NET5_0_OR_GREATER - var udsEndPoint = new UnixDomainSocketEndPoint(config.GetUri().ToString().Substring("unix://".Length)); - var connectionFactory = new UnixDomainSocketConnectionFactory(udsEndPoint); - var socketsHttpHandler = new SocketsHttpHandler - { - ConnectCallback = connectionFactory.ConnectAsync - }; - - // point to localhost and let the custom ConnectCallback handle the communication over the unix socket - // see https://learn.microsoft.com/en-us/aspnet/core/grpc/interprocess-uds?view=aspnetcore-7.0 for more details - return new FlagSyncService.FlagSyncServiceClient(GrpcChannel.ForAddress("http://localhost", new GrpcChannelOptions - { - HttpHandler = socketsHttpHandler, - })); -#endif - // unix socket support is not available in this dotnet version - throw new Exception("unix sockets are not supported in this version."); } - } diff --git a/src/OpenFeature.Contrib.Providers.Flagd/Resolver/InProcess/Storage/FileStorage.cs b/src/OpenFeature.Contrib.Providers.Flagd/Resolver/InProcess/Storage/FileStorage.cs new file mode 100644 index 000000000..c50365615 --- /dev/null +++ b/src/OpenFeature.Contrib.Providers.Flagd/Resolver/InProcess/Storage/FileStorage.cs @@ -0,0 +1,63 @@ +using System.IO; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace OpenFeature.Contrib.Providers.Flagd.Resolver.InProcess.Storage; + +internal class FileStorage: Storage +{ + private readonly Channel _eventChannel = Channel.CreateBounded(1); + private readonly string _path; + private readonly FileSystemWatcher _fileSystemWatcher; + + internal FileStorage(FlagdConfig config) + { + _path = config.OfflineFlagSourceFullPath; + _fileSystemWatcher = new FileSystemWatcher(Path.GetDirectoryName(_path), Path.GetFileName(_path)) + { + EnableRaisingEvents = true, + NotifyFilter = NotifyFilters.LastWrite, + }; + _fileSystemWatcher.Changed += (_, _) => this.HandleFileChanged(); + } + + public Task Init() + { + return Task.Run(() => + { + var latch = new CountdownEvent(1); + new Thread(() => + { + var file = File.ReadAllText(_path); + + if (!latch.IsSet) + { + latch.Signal(); + } + this._eventChannel.Writer.TryWrite(new StorageEvent(StorageEvent.Type.READY, file)); + }) + { + IsBackground = true + }.Start(); + latch.Wait(); + }); + } + + public Task Shutdown() + { + _fileSystemWatcher.Dispose(); + return Task.CompletedTask; + } + + public Channel EventChannel() + { + return this._eventChannel; + } + + private void HandleFileChanged() + { + var file = File.ReadAllText(_path); + this._eventChannel.Writer.TryWrite(new StorageEvent(StorageEvent.Type.CHANGED, file)); + } +} diff --git a/src/OpenFeature.Contrib.Providers.Flagd/Resolver/InProcess/Storage/RpcStorage.cs b/src/OpenFeature.Contrib.Providers.Flagd/Resolver/InProcess/Storage/RpcStorage.cs new file mode 100644 index 000000000..b91c369e4 --- /dev/null +++ b/src/OpenFeature.Contrib.Providers.Flagd/Resolver/InProcess/Storage/RpcStorage.cs @@ -0,0 +1,235 @@ +using System; +using System.IO; +using System.Net.Http; +#if NET462_OR_GREATER +using System.Net.Security; +using System.Linq; +#endif +#if NET8_0_OR_GREATER +using System.Net.Sockets; +#endif +using System.Security.Cryptography.X509Certificates; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Net.Client; +using OpenFeature.Flagd.Grpc.Sync; + +namespace OpenFeature.Contrib.Providers.Flagd.Resolver.InProcess.Storage; + +internal class RpcStorage: Storage +{ + static readonly int InitialEventStreamRetryBaseBackoff = 1; + static readonly int MaxEventStreamRetryBackoff = 60; + readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); + private readonly FlagSyncService.FlagSyncServiceClient _client; + private GrpcChannel _channel; + private string _sourceSelector; + private readonly Channel _eventQueue = Channel.CreateBounded(1); + private bool connected = false; + private readonly Mutex _mtx; + private int _eventStreamRetryBackoff = InitialEventStreamRetryBaseBackoff; + + internal RpcStorage(FlagdConfig config) + { + _client = BuildClient(config, channel => new FlagSyncService.FlagSyncServiceClient(channel)); + _sourceSelector = config.SourceSelector; + _mtx = new Mutex(); + } + + internal RpcStorage(FlagSyncService.FlagSyncServiceClient client, FlagdConfig config): this(config) + { + _client = client; + _sourceSelector = config.SourceSelector; + } + + public Task Init() + { + return Task.Run(() => + { + var latch = new CountdownEvent(1); + new Thread(async () => await this.HandleStreamEvents(latch).ConfigureAwait(false)) + { + IsBackground = true + }.Start(); + latch.Wait(); + }); + } + + public Task Shutdown() + { + _cancellationTokenSource.Cancel(); + return _channel?.ShutdownAsync().ContinueWith((t) => + { + _channel.Dispose(); + if (t.IsFaulted) throw t.Exception; + }); + } + + public Channel EventChannel() + { + return this._eventQueue; + } + + private async Task HandleStreamEvents(CountdownEvent latch) + { + CancellationToken token = _cancellationTokenSource.Token; + while (!token.IsCancellationRequested) + { + var call = _client.SyncFlags(new SyncFlagsRequest + { + Selector = _sourceSelector + }); + try + { + while (!token.IsCancellationRequested && await call.ResponseStream.MoveNext().ConfigureAwait(false)) + { + var response = call.ResponseStream.Current; + if (!latch.IsSet) + { + latch.Signal(); + } + + HandleStorageReadyEvent(response.FlagConfiguration); + HandleStorageChangedEvent(response.FlagConfiguration); + } + } + catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled) + { + // do nothing, we've been shutdown + HandleProviderStaleEvent(); + } + catch (RpcException) + { + // Handle the dropped connection by reconnecting and retrying the stream + await HandleErrorEvent().ConfigureAwait(false); + } + } + } + + private void HandleStorageChangedEvent(string flagConfiguration) + { + _mtx.WaitOne(); + _eventStreamRetryBackoff = InitialEventStreamRetryBaseBackoff; + if (connected) + { + this._eventQueue.Writer.TryWrite(new StorageEvent(StorageEvent.Type.CHANGED, flagConfiguration)); + } + _mtx.ReleaseMutex(); + } + + private void HandleStorageReadyEvent(string flagConfiguration) + { + _mtx.WaitOne(); + _eventStreamRetryBackoff = InitialEventStreamRetryBaseBackoff; + if (!connected) + { + connected = true; + this._eventQueue.Writer.TryWrite(new StorageEvent(StorageEvent.Type.READY, flagConfiguration)); + } + _mtx.ReleaseMutex(); + } + + private async Task HandleErrorEvent() + { + _mtx.WaitOne(); + _eventStreamRetryBackoff = Math.Min(_eventStreamRetryBackoff * 2, MaxEventStreamRetryBackoff); + if (connected) + { + connected = false; + this._eventQueue.Writer.TryWrite(new StorageEvent(StorageEvent.Type.ERROR)); + } + _mtx.ReleaseMutex(); + await Task.Delay(_eventStreamRetryBackoff * 1000).ConfigureAwait(false); + } + + private void HandleProviderStaleEvent() + { + _mtx.WaitOne(); + if (connected) + { + this._eventQueue.Writer.TryWrite(new StorageEvent(StorageEvent.Type.STALE)); + } + _mtx.ReleaseMutex(); + } + + private T BuildClient(FlagdConfig config, Func constructorFunc) + { + var useUnixSocket = config.GetUri().ToString().StartsWith("unix://"); + + if (!useUnixSocket) + { +#if NET462_OR_GREATER + var handler = new WinHttpHandler(); +#else + var handler = new HttpClientHandler(); +#endif + if (config.UseCertificate) + { + if (File.Exists(config.CertificatePath)) + { + X509Certificate2 certificate = new X509Certificate2(config.CertificatePath); +#if NET5_0_OR_GREATER + handler.ServerCertificateCustomValidationCallback = (message, cert, chain, _) => { + // the the custom cert to the chain, Build returns a bool if valid. + chain.ChainPolicy.TrustMode = X509ChainTrustMode.CustomRootTrust; + chain.ChainPolicy.CustomTrustStore.Add(certificate); + return chain.Build(cert); + }; +#elif NET462_OR_GREATER + handler.ServerCertificateValidationCallback = (message, cert, chain, errors) => + { + if (errors == SslPolicyErrors.None) { return true; } + + chain.ChainPolicy.VerificationFlags = X509VerificationFlags.AllowUnknownCertificateAuthority; + + chain.ChainPolicy.ExtraStore.Add(certificate); + + var isChainValid = chain.Build(cert); + + if (!isChainValid) { return false; } + + var isValid = chain.ChainElements + .Cast() + .Any(x => x.Certificate.RawData.SequenceEqual(certificate.GetRawCertData())); + + return isValid; + }; +#else + throw new ArgumentException("Custom Certificates are not supported on your platform"); +#endif + } + else + { + throw new ArgumentException("Specified certificate cannot be found."); + } + } + _channel = GrpcChannel.ForAddress(config.GetUri(), new GrpcChannelOptions + { + HttpHandler = handler + }); + return constructorFunc(_channel); + + } + +#if NET5_0_OR_GREATER + var udsEndPoint = new UnixDomainSocketEndPoint(config.GetUri().ToString().Substring("unix://".Length)); + var connectionFactory = new UnixDomainSocketConnectionFactory(udsEndPoint); + var socketsHttpHandler = new SocketsHttpHandler + { + ConnectCallback = connectionFactory.ConnectAsync + }; + + // point to localhost and let the custom ConnectCallback handle the communication over the unix socket + // see https://learn.microsoft.com/en-us/aspnet/core/grpc/interprocess-uds?view=aspnetcore-7.0 for more details + _channel = GrpcChannel.ForAddress("http://localhost", new GrpcChannelOptions + { + HttpHandler = socketsHttpHandler, + }); + return constructorFunc(_channel); +#endif + // unix socket support is not available in this dotnet version + throw new Exception("unix sockets are not supported in this version."); + } +} diff --git a/src/OpenFeature.Contrib.Providers.Flagd/Resolver/InProcess/Storage/Storage.cs b/src/OpenFeature.Contrib.Providers.Flagd/Resolver/InProcess/Storage/Storage.cs new file mode 100644 index 000000000..9fc4af219 --- /dev/null +++ b/src/OpenFeature.Contrib.Providers.Flagd/Resolver/InProcess/Storage/Storage.cs @@ -0,0 +1,11 @@ +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace OpenFeature.Contrib.Providers.Flagd.Resolver.InProcess.Storage; + +internal interface Storage +{ + Task Init(); + Task Shutdown(); + Channel EventChannel(); +} diff --git a/src/OpenFeature.Contrib.Providers.Flagd/Resolver/InProcess/Storage/StorageEvent.cs b/src/OpenFeature.Contrib.Providers.Flagd/Resolver/InProcess/Storage/StorageEvent.cs new file mode 100644 index 000000000..1e0b3f9fa --- /dev/null +++ b/src/OpenFeature.Contrib.Providers.Flagd/Resolver/InProcess/Storage/StorageEvent.cs @@ -0,0 +1,21 @@ +namespace OpenFeature.Contrib.Providers.Flagd.Resolver.InProcess.Storage; + +internal class StorageEvent +{ + internal StorageEvent(Type type, string flagConfiguration = "") + { + EventType = type; + FlagConfiguration = flagConfiguration; + } + + public enum Type + { + READY, + CHANGED, + STALE, + ERROR, + } + + public string FlagConfiguration { get; } + public Type EventType { get; } +} diff --git a/test/OpenFeature.Contrib.Providers.Flagd.Test/FlagdProviderTest.cs b/test/OpenFeature.Contrib.Providers.Flagd.Test/FlagdProviderTest.cs index d77cd5c30..14fb1d913 100644 --- a/test/OpenFeature.Contrib.Providers.Flagd.Test/FlagdProviderTest.cs +++ b/test/OpenFeature.Contrib.Providers.Flagd.Test/FlagdProviderTest.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.IO; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; @@ -808,4 +809,61 @@ await Utils.AssertUntilAsync( mockGrpcClient.Received(Quantity.AtLeastOne()).SyncFlags(Arg.Is(req => req.Selector == "source-selector"), null, null, CancellationToken.None); } + + [Fact] + public async Task TestInProcessFileResolver() + { + var path = Path.Combine(Directory.GetCurrentDirectory(), "flags.json"); + await File.WriteAllTextAsync(path, Utils.flags); + + var config = new FlagdConfig(); + config.CacheEnabled = true; + config.OfflineFlagSourceFullPath = path; + config.ResolverType = ResolverType.FILE; + + var inProcessResolver = new InProcessResolver(config, MakeChannel(), MakeProviderMetadata()); + var flagdProvider = new FlagdProvider(inProcessResolver); + await flagdProvider.InitializeAsync(EvaluationContext.Empty); + + // resolve with default set to false to make sure we return what the file gives us + await Utils.AssertUntilAsync( + async _ => + { + var val = await flagdProvider.ResolveBooleanValueAsync("staticBoolFlag", false, cancellationToken: CancellationToken.None).ConfigureAwait(false); + Assert.True(val.Value); + }); + } + + [Fact] + public async Task TestInProcessFileResolverChangeEvents() + { + var path = Path.Combine(Directory.GetCurrentDirectory(), "flags.json"); + await File.WriteAllTextAsync(path, Utils.flags); + + var config = new FlagdConfig(); + config.CacheEnabled = true; + config.OfflineFlagSourceFullPath = path; + config.ResolverType = ResolverType.FILE; + + var inProcessResolver = new InProcessResolver(config, MakeChannel(), MakeProviderMetadata()); + var flagdProvider = new FlagdProvider(inProcessResolver); + await flagdProvider.InitializeAsync(EvaluationContext.Empty); + + // resolve with default set to false to make sure we return what the file gives us + await Utils.AssertUntilAsync( + async _ => + { + var val = await flagdProvider.ResolveBooleanValueAsync("staticBoolFlag", false, cancellationToken: CancellationToken.None).ConfigureAwait(false); + Assert.True(val.Value); + }); + + await File.WriteAllTextAsync(path, Utils.validFlagConfig); + + await Utils.AssertUntilAsync( + async _ => + { + var exception = await Assert.ThrowsAsync(async () => await flagdProvider.ResolveBooleanValueAsync("static", false).ConfigureAwait(false)).ConfigureAwait(false); + Assert.Equal(ErrorType.FlagNotFound, exception.ErrorType); + }); + } }