自学内容网 自学内容网

Orleans集群及Placement设置

服务端界面使用相同的clusterid和serviceid,相同ip地址,不同网关端口号和服务端口号,启动两个silo服务,并使用MySql数据库做Silo间信息同步,实现集群。

 silo服务启动代码如下(从nuget下载Microsoft.Orleans.Clustering.AdoNet库):

            var clusterID = ClusterID;
            var serviceID = ServiceID;
            var siloPort = SiloPort;
            var gatewayPort = GatewayPort;
            //本地集群方式部署
            var primarySiloEndPoint = new IPEndPoint(IPAddress.Parse(PrimarySiloIPAddr), siloPort);
            var silo = new HostBuilder()
                .UseOrleans(builder =>
                {
                    builder.UseAdoNetClustering(options =>
                    {
                        options.Invariant = GlobalValueDefinition.MySqlInvariant;
                        options.ConnectionString = GlobalValueDefinition.MySqlConnection;
                    })
                    .Configure<ClusterOptions>(options =>
                    {
                        options.ClusterId = clusterID;
                        options.ServiceId = serviceID;
                    })
                    .ConfigureEndpoints(siloPort: siloPort, gatewayPort: gatewayPort)
                    .ConfigureLogging(logging => logging.AddConsole())
                    .UseDashboard(options =>
                    {
                        options.Username = "henreash";
                        options.Password = "123456";
                        options.Host = "*";
                        options.Port = 8081;
                    })
                    //.AddMemoryStreams(GlobalValueDefinition.StreamProviderName) //unget引入 Microsoft.Orleans.Streaming
                    .AddMemoryGrainStorage(GlobalValueDefinition.GrainStorageName)
                    .AddPlacementDirector<MyPlacementStrategy>(sp=>new MyPlacementDirector())
                    #region 拦截器
                    .AddIncomingGrainCallFilter(async context =>
                    {
                        if (context.InterfaceMethod.Name == nameof(IHello.HelloCallFilter))
                        {
                            RequestContext.Set("CallFilterValue", "this value was added by the filter");
                        }
                        await context.Invoke();
                        if (context.InterfaceMethod.Name == nameof(IHello.HelloCallFilter))
                        {
                            context.Result = $"{context.Result},added by the filter  {clusterID} - {serviceID}  - {gatewayPort}";
                        }
                    }
                    );
                    #endregion
                    #region 使用NewtonJson做序列化引擎
                    //需引用Microsoft.Orleans.Serialization.NewtonsoftJson包
                    builder.Services.AddSerializer(serializerBuilder =>
                    {
                        serializerBuilder.AddNewtonsoftJsonSerializer(isSupported: type => true/*type.Namespace.StartsWith("Example.Namespace")*/);
                    });
                    #endregion
                }).Build();
            await silo.RunAsync();

MySql数据库常量定义:

        public const string MySqlInvariant = "MySql.Data.MySqlClient";
        public const string MySqlConnection = "server=localhost;user id=root;database=orleans_test;port=3306;password=xxxxxx";

 

客户端启动代码(从nuget下载Microsoft.Orleans.Clustering.AdoNet库):

            var clusterID = ClusterID;
            var serviceID = ServiceID;
            var serviceID02 = ServiceID02;
            var PRIMARY_SILO_IP_ADDRESS = IPAddress.Parse(textBox_IPAddr.Text);
            #region ADO服务器群集模式连接
            host01 = Host.CreateDefaultBuilder()
                .UseOrleansClient(clientBuilder =>
                    clientBuilder.UseAdoNetClustering(options => {
                        options.Invariant = GlobalValueDefinition.MySqlInvariant;
                        options.ConnectionString = GlobalValueDefinition.MySqlConnection;
                    })
                    .Configure<ClusterOptions>(options =>
                    {
                        options.ClusterId = clusterID;
                        options.ServiceId = serviceID;
                    })
                #region Stream测试
                    .AddMemoryStreams(GlobalValueDefinition.StreamProviderName)
                #endregion
                ).Build();
            await host01.StartAsync();

            client01 = host01.Services.GetRequiredService<IClusterClient>();
            #endregion

            #region 订阅
            friend1 = client01.GetGrain<IHello>("user1");
            friend2 = client01.GetGrain<IHello>("user2");
            Chat c = new Chat();
            (c as IChatNotify).ReceiveAct += (text, data) => ShowLog($"user1 callback {text} dateLen={data.Length}");
            chat = client01.CreateObjectReference<IChat>(c);
            await friend1.Subscribe(chat);
            Chat c02 = new Chat();
            (c02 as IChatNotify).ReceiveAct += (text, data) => ShowLog($"user2 callback {text} dateLen={data.Length}");
            chat02 = client01.CreateObjectReference<IChat>(c02);
            await friend2.Subscribe(chat02);
            #endregion

客户端只需指定clusterid、serviceid、mysql数据库连接字符串即可。测试过程中发现这种模式有几秒延时,服务端启动后立即启动客户端并进行连接,可能出现某个silo服务无法连接的情况。

 Placement:

比如特定业务场景,服务器上部署了硬件外设,希望每个silo服务器都启动一个grain实例,可使用Placement机制进行制约;

首先定义Placement规则:

    public class MyPlacementDirector : IPlacementDirector
    {
        public Task<SiloAddress> OnAddActivation(PlacementStrategy strategy, PlacementTarget target, IPlacementContext context)
        {
            var userID = target.GrainIdentity.Key.ToString();
            var silos = context.GetCompatibleSilos(target).ToArray();
            SiloAddress siloAddress = silos.First();
            if (userID == "user1")
                siloAddress = silos?.FirstOrDefault(x=>x.Endpoint.Port == 11111) ?? siloAddress;
            else if(userID == "user2")
                siloAddress = silos?.FirstOrDefault(x => x.Endpoint.Port == 11112) ?? siloAddress;
            return Task.FromResult(siloAddress);
        }
    }

    [Serializable]
    public sealed class MyPlacementStrategy: PlacementStrategy
    {

    }

    [AttributeUsage(AttributeTargets.Class, AllowMultiple = false)]
    public sealed class MyPlacementStrategyAttribute : PlacementAttribute
    {
        public MyPlacementStrategyAttribute() : base(new MyPlacementStrategy()) { }
    }

 上面代码做了简单约定,grain user1在端口11111的silo上创建,user2在端口11112的silo上创建。

Grain定义类上增加PlacementAttribute注解,服务端启动silo服务时注册自定义Placement规则:

另外Stream会根据StreamID创建额外的Grain,实际应用需注意。

双向通信过程中,grain向客户端发消息,数据量几十K效率还可以,过大(几兆)导致卡顿。


原文地址:https://blog.csdn.net/henreash/article/details/143666541

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!