TaskMessageBroker

UniRxに入っているAsyncMessageBrokerのTaskバージョン

using System;
using System.Collections.Generic;
using Cysharp.Threading.Tasks;

namespace Extensions
{
    public interface ITaskMessagePublisher
    {
        UniTask PublishAsync<T>(T message);
    }

    public interface ITaskMessageReceiver
    {
        IDisposable Subscribe<T>(Func<T, UniTask> asyncMessageReceiver);
    }

    public interface ITaskMessageBroker : ITaskMessagePublisher, ITaskMessageReceiver
    {
    }

    public class TaskMessageBroker : ITaskMessageBroker, IDisposable
    {
        /// <summary>
        /// AsyncMessageBroker in Global scope.
        /// </summary>
        public static readonly ITaskMessageBroker Default = new TaskMessageBroker();

        bool isDisposed = false;
        readonly Dictionary<Type, object> notifiers = new Dictionary<Type, object>();

        public UniTask PublishAsync<T>(T message)
        {
            List<Func<T, UniTask>> notifier;
            lock (notifiers)
            {
                if (isDisposed) throw new ObjectDisposedException("TaskMessageBroker");

                object _notifier;
                if (notifiers.TryGetValue(typeof(T), out _notifier))
                {
                    notifier = (List<Func<T, UniTask>>)_notifier;
                }
                else
                {
                    return UniTask.CompletedTask;
                }
            }

            return UniTask.WhenAll(notifier.Select(x => x.Invoke(message)));
        }

        public IDisposable Subscribe<T>(Func<T, UniTask> asyncMessageReceiver)
        {
            lock (notifiers)
            {
                if (isDisposed) throw new ObjectDisposedException("TaskMessageBroker");

                object _notifier;
                if (!notifiers.TryGetValue(typeof(T), out _notifier))
                {
                    var notifier = new List<Func<T, UniTask>>();
                    notifier.Add(asyncMessageReceiver);
                    notifiers.Add(typeof(T), notifier);
                }
                else
                {
                    var notifier = (List<Func<T, UniTask>>)_notifier;
                    notifier.Add(asyncMessageReceiver);
                    notifiers[typeof(T)] = notifier;
                }
            }

            return new Subscription<T>(this, asyncMessageReceiver);
        }

        public void Dispose()
        {
            lock (notifiers)
            {
                if (!isDisposed)
                {
                    isDisposed = true;
                    notifiers.Clear();
                }
            }
        }

        class Subscription<T> : IDisposable
        {
            readonly TaskMessageBroker parent;
            readonly Func<T, UniTask> asyncMessageReceiver;

            public Subscription(TaskMessageBroker parent, Func<T, UniTask> asyncMessageReceiver)
            {
                this.parent = parent;
                this.asyncMessageReceiver = asyncMessageReceiver;
            }

            public void Dispose()
            {
                lock (parent.notifiers)
                {
                    object _notifier;
                    if (parent.notifiers.TryGetValue(typeof(T), out _notifier))
                    {
                        var notifier = (List<Func<T, UniTask>>)_notifier;
                        notifier.Remove(asyncMessageReceiver);

                        parent.notifiers[typeof(T)] = notifier;
                    }
                }
            }
        }
    }
}