Orleans集群及Placement设置
服务端界面使用相同的clusterid和serviceid,相同ip地址,不同网关端口号和服务端口号,启动两个silo服务,并使用MySql数据库做Silo间信息同步,实现集群。
silo服务启动代码如下(从nuget下载Microsoft.Orleans.Clustering.AdoNet库):
var clusterID = ClusterID;
var serviceID = ServiceID;
var siloPort = SiloPort;
var gatewayPort = GatewayPort;
//本地集群方式部署
var primarySiloEndPoint = new IPEndPoint(IPAddress.Parse(PrimarySiloIPAddr), siloPort);
var silo = new HostBuilder()
.UseOrleans(builder =>
{
builder.UseAdoNetClustering(options =>
{
options.Invariant = GlobalValueDefinition.MySqlInvariant;
options.ConnectionString = GlobalValueDefinition.MySqlConnection;
})
.Configure<ClusterOptions>(options =>
{
options.ClusterId = clusterID;
options.ServiceId = serviceID;
})
.ConfigureEndpoints(siloPort: siloPort, gatewayPort: gatewayPort)
.ConfigureLogging(logging => logging.AddConsole())
.UseDashboard(options =>
{
options.Username = "henreash";
options.Password = "123456";
options.Host = "*";
options.Port = 8081;
})
//.AddMemoryStreams(GlobalValueDefinition.StreamProviderName) //unget引入 Microsoft.Orleans.Streaming
.AddMemoryGrainStorage(GlobalValueDefinition.GrainStorageName)
.AddPlacementDirector<MyPlacementStrategy>(sp=>new MyPlacementDirector())
#region 拦截器
.AddIncomingGrainCallFilter(async context =>
{
if (context.InterfaceMethod.Name == nameof(IHello.HelloCallFilter))
{
RequestContext.Set("CallFilterValue", "this value was added by the filter");
}
await context.Invoke();
if (context.InterfaceMethod.Name == nameof(IHello.HelloCallFilter))
{
context.Result = $"{context.Result},added by the filter {clusterID} - {serviceID} - {gatewayPort}";
}
}
);
#endregion
#region 使用NewtonJson做序列化引擎
//需引用Microsoft.Orleans.Serialization.NewtonsoftJson包
builder.Services.AddSerializer(serializerBuilder =>
{
serializerBuilder.AddNewtonsoftJsonSerializer(isSupported: type => true/*type.Namespace.StartsWith("Example.Namespace")*/);
});
#endregion
}).Build();
await silo.RunAsync();
MySql数据库常量定义:
public const string MySqlInvariant = "MySql.Data.MySqlClient";
public const string MySqlConnection = "server=localhost;user id=root;database=orleans_test;port=3306;password=xxxxxx";
客户端启动代码(从nuget下载Microsoft.Orleans.Clustering.AdoNet库):
var clusterID = ClusterID;
var serviceID = ServiceID;
var serviceID02 = ServiceID02;
var PRIMARY_SILO_IP_ADDRESS = IPAddress.Parse(textBox_IPAddr.Text);
#region ADO服务器群集模式连接
host01 = Host.CreateDefaultBuilder()
.UseOrleansClient(clientBuilder =>
clientBuilder.UseAdoNetClustering(options => {
options.Invariant = GlobalValueDefinition.MySqlInvariant;
options.ConnectionString = GlobalValueDefinition.MySqlConnection;
})
.Configure<ClusterOptions>(options =>
{
options.ClusterId = clusterID;
options.ServiceId = serviceID;
})
#region Stream测试
.AddMemoryStreams(GlobalValueDefinition.StreamProviderName)
#endregion
).Build();
await host01.StartAsync();
client01 = host01.Services.GetRequiredService<IClusterClient>();
#endregion
#region 订阅
friend1 = client01.GetGrain<IHello>("user1");
friend2 = client01.GetGrain<IHello>("user2");
Chat c = new Chat();
(c as IChatNotify).ReceiveAct += (text, data) => ShowLog($"user1 callback {text} dateLen={data.Length}");
chat = client01.CreateObjectReference<IChat>(c);
await friend1.Subscribe(chat);
Chat c02 = new Chat();
(c02 as IChatNotify).ReceiveAct += (text, data) => ShowLog($"user2 callback {text} dateLen={data.Length}");
chat02 = client01.CreateObjectReference<IChat>(c02);
await friend2.Subscribe(chat02);
#endregion
客户端只需指定clusterid、serviceid、mysql数据库连接字符串即可。测试过程中发现这种模式有几秒延时,服务端启动后立即启动客户端并进行连接,可能出现某个silo服务无法连接的情况。
Placement:
比如特定业务场景,服务器上部署了硬件外设,希望每个silo服务器都启动一个grain实例,可使用Placement机制进行制约;
首先定义Placement规则:
public class MyPlacementDirector : IPlacementDirector
{
public Task<SiloAddress> OnAddActivation(PlacementStrategy strategy, PlacementTarget target, IPlacementContext context)
{
var userID = target.GrainIdentity.Key.ToString();
var silos = context.GetCompatibleSilos(target).ToArray();
SiloAddress siloAddress = silos.First();
if (userID == "user1")
siloAddress = silos?.FirstOrDefault(x=>x.Endpoint.Port == 11111) ?? siloAddress;
else if(userID == "user2")
siloAddress = silos?.FirstOrDefault(x => x.Endpoint.Port == 11112) ?? siloAddress;
return Task.FromResult(siloAddress);
}
}
[Serializable]
public sealed class MyPlacementStrategy: PlacementStrategy
{
}
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false)]
public sealed class MyPlacementStrategyAttribute : PlacementAttribute
{
public MyPlacementStrategyAttribute() : base(new MyPlacementStrategy()) { }
}
上面代码做了简单约定,grain user1在端口11111的silo上创建,user2在端口11112的silo上创建。
Grain定义类上增加PlacementAttribute注解,服务端启动silo服务时注册自定义Placement规则:
另外Stream会根据StreamID创建额外的Grain,实际应用需注意。
双向通信过程中,grain向客户端发消息,数据量几十K效率还可以,过大(几兆)导致卡顿。
原文地址:https://blog.csdn.net/henreash/article/details/143666541
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!