1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| services.AddSingleton(RabbitHutch.CreateBus("host=localhost;username=admin;password=admin;timeout=60")); app.UseSubscribe("OrderService", Assembly.GetExecutingAssembly());
public static class EApplicationExtenssion { public static IApplicationBuilder UseSubscribe(this IApplicationBuilder appBuilder, string subscriptionIdPrefix, Assembly assembly) { var services = appBuilder.ApplicationServices.CreateScope().ServiceProvider; var lifeTime = services.GetService<Microsoft.Extensions.Hosting.IHostApplicationLifetime>(); var bus = services.GetService<IBus>(); lifeTime.ApplicationStarted.Register(() => { var subscriber = new AutoSubscriber(bus, subscriptionIdPrefix); var ass = new Assembly[] { assembly }; subscriber.Subscribe(ass); subscriber.SubscribeAsync(ass); }); lifeTime.ApplicationStopped.Register(() => { bus.Dispose(); }); return appBuilder; } }
public class TestController : DefaultController { private IBus bus; public TestController(EFDataContext ctx, IBus bus) { this.ctx = ctx; this.bus = bus; } public ActionResult Index() { bus.Scheduler.FuturePublish(new Order { OrderId = 999 }, TimeSpan.FromSeconds(5)); return Success(); } } [Queue("Qka.Order", ExchangeName = "Qka.Order")] public class Order { public int OrderId { get; set; } } public class OrderConsumer : IConsume<Order> { [AutoSubscriberConsumer(SubscriptionId = "OrderService")] public void Consume(Order message, CancellationToken cancellationToken = default) { } }
|