Merge pull request #17557 from abpframework/liangshiwei/group13

Group13 Enable nullable annotations
pull/17565/head
maliming 2 years ago committed by GitHub
commit e621fea55f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -14,7 +14,7 @@ public class AzureServiceBusMessageConsumerFactory : IAzureServiceBusMessageCons
ServiceScope = serviceScopeFactory.CreateScope();
}
public IAzureServiceBusMessageConsumer CreateMessageConsumer(string topicName, string subscriptionName, string connectionName)
public IAzureServiceBusMessageConsumer CreateMessageConsumer(string topicName, string subscriptionName, string? connectionName)
{
var processor = ServiceScope.ServiceProvider.GetRequiredService<AzureServiceBusMessageConsumer>();
processor.Initialize(topicName, subscriptionName, connectionName);

@ -28,7 +28,7 @@ public class ConnectionPool : IConnectionPool, ISingletonDependency
Logger = new NullLogger<ConnectionPool>();
}
public ServiceBusClient GetClient(string connectionName)
public ServiceBusClient GetClient(string? connectionName)
{
connectionName ??= AzureServiceBusConnections.DefaultConnectionName;
return _clients.GetOrAdd(
@ -40,7 +40,7 @@ public class ConnectionPool : IConnectionPool, ISingletonDependency
).Value;
}
public ServiceBusAdministrationClient GetAdministrationClient(string connectionName)
public ServiceBusAdministrationClient GetAdministrationClient(string? connectionName)
{
connectionName ??= AzureServiceBusConnections.DefaultConnectionName;
return _adminClients.GetOrAdd(

@ -16,5 +16,5 @@ public interface IAzureServiceBusMessageConsumerFactory
IAzureServiceBusMessageConsumer CreateMessageConsumer(
string topicName,
string subscriptionName,
string connectionName);
string? connectionName);
}

@ -6,7 +6,7 @@ namespace Volo.Abp.AzureServiceBus;
public interface IConnectionPool : IAsyncDisposable
{
ServiceBusClient GetClient(string connectionName);
ServiceBusClient GetClient(string? connectionName);
ServiceBusAdministrationClient GetAdministrationClient(string connectionName);
ServiceBusAdministrationClient GetAdministrationClient(string? connectionName);
}

@ -6,5 +6,5 @@ namespace Volo.Abp.AzureServiceBus;
public interface IPublisherPool : IAsyncDisposable
{
Task<ServiceBusSender> GetAsync(string topicName, string connectionName);
Task<ServiceBusSender> GetAsync(string topicName, string? connectionName);
}

@ -24,7 +24,7 @@ public class PublisherPool : IPublisherPool, ISingletonDependency
Logger = new NullLogger<PublisherPool>();
}
public async Task<ServiceBusSender> GetAsync(string topicName, string connectionName)
public async Task<ServiceBusSender> GetAsync(string topicName, string? connectionName)
{
var admin = _connectionPool.GetAdministrationClient(connectionName);
await admin.SetupTopicAsync(topicName);

@ -5,6 +5,8 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;net7.0</TargetFrameworks>
<Nullable>enable</Nullable>
<WarningsAsErrors>Nullable</WarningsAsErrors>
<RootNamespace />
</PropertyGroup>

@ -4,7 +4,7 @@ public class DistributedEventReceived
{
public DistributedEventSource Source { get; set; }
public string EventName { get; set; }
public string EventName { get; set; } = default!;
public object EventData { get; set; }
public object EventData { get; set; } = default!;
}

@ -4,7 +4,7 @@ public class DistributedEventSent
{
public DistributedEventSource Source { get; set; }
public string EventName { get; set; }
public string EventName { get; set; } = default!;
public object EventData { get; set; }
public object EventData { get; set; } = default!;
}

@ -13,13 +13,13 @@ public class InboxConfig
get => _databaseName;
set => _databaseName = Check.NotNullOrWhiteSpace(value, nameof(DatabaseName));
}
[NotNull] private string _databaseName;
[NotNull] private string _databaseName = default!;
public Type ImplementationType { get; set; }
public Type ImplementationType { get; set; } = default!;
public Func<Type, bool> EventSelector { get; set; }
public Func<Type, bool>? EventSelector { get; set; }
public Func<Type, bool> HandlerSelector { get; set; }
public Func<Type, bool>? HandlerSelector { get; set; }
/// <summary>
/// Used to enable/disable processing incoming events.

@ -12,11 +12,11 @@ public class IncomingEventInfo : IHasExtraProperties
public Guid Id { get; }
public string MessageId { get; }
public string MessageId { get; } = default!;
public string EventName { get; }
public string EventName { get; } = default!;
public byte[] EventData { get; }
public byte[] EventData { get; } = default!;
public DateTime CreationTime { get; }
@ -47,7 +47,7 @@ public class IncomingEventInfo : IHasExtraProperties
ExtraProperties[EventBusConsts.CorrelationIdHeaderName] = correlationId;
}
public string GetCorrelationId()
public string? GetCorrelationId()
{
return ExtraProperties.GetOrDefault(EventBusConsts.CorrelationIdHeaderName)?.ToString();
}

@ -13,11 +13,11 @@ public class OutboxConfig
get => _databaseName;
set => _databaseName = Check.NotNullOrWhiteSpace(value, nameof(DatabaseName));
}
[NotNull] private string _databaseName;
[NotNull] private string _databaseName = default!;
public Type ImplementationType { get; set; }
public Type ImplementationType { get; set; } = default!;
public Func<Type, bool> Selector { get; set; }
public Func<Type, bool>? Selector { get; set; }
/// <summary>
/// Used to enable/disable sending events from outbox to the message broker.

@ -12,9 +12,9 @@ public class OutgoingEventInfo : IHasExtraProperties
public Guid Id { get; }
public string EventName { get; }
public string EventName { get; } = default!;
public byte[] EventData { get; }
public byte[] EventData { get; } = default!;
public DateTime CreationTime { get; }
@ -43,7 +43,7 @@ public class OutgoingEventInfo : IHasExtraProperties
ExtraProperties[EventBusConsts.CorrelationIdHeaderName] = correlationId;
}
public string GetCorrelationId()
public string? GetCorrelationId()
{
return ExtraProperties.GetOrDefault(EventBusConsts.CorrelationIdHeaderName)?.ToString();
}

@ -23,12 +23,12 @@ public class EventNameAttribute : Attribute, IEventNameProvider
{
Check.NotNull(eventType, nameof(eventType));
return eventType
return (eventType
.GetCustomAttributes(true)
.OfType<IEventNameProvider>()
.FirstOrDefault()
?.GetName(eventType)
?? eventType.FullName;
?? eventType.FullName)!;
}
public string GetName(Type eventType)

@ -5,9 +5,9 @@ namespace Volo.Abp.EventBus;
[AttributeUsage(AttributeTargets.Class)]
public class GenericEventNameAttribute : Attribute, IEventNameProvider
{
public string Prefix { get; set; }
public string? Prefix { get; set; }
public string Postfix { get; set; }
public string? Postfix { get; set; }
public virtual string GetName(Type eventType)
{

@ -5,6 +5,8 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;net7.0</TargetFrameworks>
<Nullable>enable</Nullable>
<WarningsAsErrors>Nullable</WarningsAsErrors>
<AssemblyName>Volo.Abp.EventBus.Azure</AssemblyName>
<PackageId>Volo.Abp.EventBus.Azure</PackageId>
<AssetTargetFallback>$(AssetTargetFallback);portable-net45+win8+wp8+wpa81;</AssetTargetFallback>

@ -2,11 +2,11 @@ namespace Volo.Abp.EventBus.Azure;
public class AbpAzureEventBusOptions
{
public string ConnectionName { get; set; }
public string? ConnectionName { get; set; }
public string SubscriberName { get; set; }
public string SubscriberName { get; set; } = default!;
public string TopicName { get; set; }
public string TopicName { get; set; } = default!;
public bool IsServiceBusDisabled { get; set; }
}

@ -30,7 +30,7 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
protected IAzureServiceBusSerializer Serializer { get; }
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
protected ConcurrentDictionary<string, Type> EventTypes { get; }
protected IAzureServiceBusMessageConsumer Consumer { get; private set; }
protected IAzureServiceBusMessageConsumer Consumer { get; private set; } = default!;
public AzureDistributedEventBus(
IServiceScopeFactory serviceScopeFactory,
@ -268,7 +268,7 @@ public class AzureDistributedEventBus : DistributedEventBusBase, ISingletonDepen
protected virtual async Task PublishAsync(
string eventName,
byte[] body,
[CanBeNull] string correlationId,
string? correlationId,
Guid? eventId)
{
var message = new ServiceBusMessage(body)

@ -5,6 +5,8 @@
<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<Nullable>enable</Nullable>
<WarningsAsErrors>Nullable</WarningsAsErrors>
<RootNamespace />
</PropertyGroup>

@ -10,9 +10,9 @@ public class AbpDaprEventData
public string JsonData { get; set; }
public string CorrelationId { get; set; }
public string? CorrelationId { get; set; }
public AbpDaprEventData(string pubSubName, string topic, string messageId, string jsonData, string correlationId)
public AbpDaprEventData(string pubSubName, string topic, string messageId, string jsonData, string? correlationId)
{
PubSubName = pubSubName;
Topic = topic;

@ -114,7 +114,7 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory &&
(factory as SingleInstanceHandlerFactory).HandlerInstance == handler
(factory as SingleInstanceHandlerFactory)!.HandlerInstance == handler
);
});
}
@ -186,7 +186,7 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
}
}
public virtual async Task TriggerHandlersAsync(Type eventType, object eventData, string messageId = null, string correlationId = null)
public virtual async Task TriggerHandlersAsync(Type eventType, object eventData, string? messageId = null, string? correlationId = null)
{
if (await AddToInboxAsync(messageId, EventNameAttribute.GetNameOrDefault(eventType), eventType, eventData, correlationId))
{
@ -245,15 +245,15 @@ public class DaprDistributedEventBus : DistributedEventBusBase, ISingletonDepend
public Type GetEventType(string eventName)
{
return EventTypes.GetOrDefault(eventName);
return EventTypes.GetOrDefault(eventName)!;
}
protected virtual async Task PublishToDaprAsync(Type eventType, object eventData, Guid? messageId = null, string correlationId = null)
protected virtual async Task PublishToDaprAsync(Type eventType, object eventData, Guid? messageId = null, string? correlationId = null)
{
await PublishToDaprAsync(EventNameAttribute.GetNameOrDefault(eventType), eventData, messageId, correlationId);
}
protected virtual async Task PublishToDaprAsync(string eventName, object eventData, Guid? messageId = null, string correlationId = null)
protected virtual async Task PublishToDaprAsync(string eventName, object eventData, Guid? messageId = null, string? correlationId = null)
{
var client = DaprClientFactory.Create();
var data = new AbpDaprEventData(DaprEventBusOptions.PubSubName, eventName, (messageId ?? GuidGenerator.Create()).ToString("N"), Serializer.SerializeToString(eventData), correlationId);

@ -5,6 +5,8 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;net7.0</TargetFrameworks>
<Nullable>enable</Nullable>
<WarningsAsErrors>Nullable</WarningsAsErrors>
<RootNamespace />
</PropertyGroup>

@ -3,9 +3,9 @@
public class AbpKafkaEventBusOptions
{
public string ConnectionName { get; set; }
public string? ConnectionName { get; set; }
public string TopicName { get; set; }
public string TopicName { get; set; } = default!;
public string GroupId { get; set; }
public string GroupId { get; set; } = default!;
}

@ -29,7 +29,7 @@ public class KafkaDistributedEventBus : DistributedEventBusBase, ISingletonDepen
protected IProducerPool ProducerPool { get; }
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
protected ConcurrentDictionary<string, Type> EventTypes { get; }
protected IKafkaMessageConsumer Consumer { get; private set; }
protected IKafkaMessageConsumer Consumer { get; private set; } = default!;
public KafkaDistributedEventBus(
IServiceScopeFactory serviceScopeFactory,

@ -4,9 +4,9 @@ namespace Volo.Abp.EventBus.Kafka;
public static class MessageExtensions
{
public static string GetMessageId<TKey, TValue>(this Message<TKey, TValue> message)
public static string? GetMessageId<TKey, TValue>(this Message<TKey, TValue> message)
{
string messageId = null;
string? messageId = null;
if (message.Headers.TryGetLastBytes("messageId", out var messageIdBytes))
{
@ -16,9 +16,9 @@ public static class MessageExtensions
return messageId;
}
public static string GetCorrelationId<TKey, TValue>(this Message<TKey, TValue> message)
public static string? GetCorrelationId<TKey, TValue>(this Message<TKey, TValue> message)
{
string correlationId = null;
string? correlationId = null;
if (message.Headers.TryGetLastBytes(EventBusConsts.CorrelationIdHeaderName, out var correlationIdBytes))
{

@ -5,6 +5,8 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;net7.0</TargetFrameworks>
<Nullable>enable</Nullable>
<WarningsAsErrors>Nullable</WarningsAsErrors>
<AssemblyName>Volo.Abp.EventBus.RabbitMQ</AssemblyName>
<PackageId>Volo.Abp.EventBus.RabbitMQ</PackageId>
<AssetTargetFallback>$(AssetTargetFallback);portable-net45+win8+wp8+wpa81;</AssetTargetFallback>

@ -6,13 +6,13 @@ public class AbpRabbitMqEventBusOptions
{
public const string DefaultExchangeType = RabbitMqConsts.ExchangeTypes.Direct;
public string ConnectionName { get; set; }
public string? ConnectionName { get; set; }
public string ClientName { get; set; }
public string ClientName { get; set; } = default!;
public string ExchangeName { get; set; }
public string ExchangeName { get; set; } = default!;
public string ExchangeType { get; set; }
public string? ExchangeType { get; set; }
public ushort? PrefetchCount { get; set; }
@ -20,6 +20,6 @@ public class AbpRabbitMqEventBusOptions
{
return string.IsNullOrEmpty(ExchangeType)
? DefaultExchangeType
: ExchangeType;
: ExchangeType!;
}
}

@ -35,7 +35,7 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
protected ConcurrentDictionary<string, Type> EventTypes { get; }
protected IRabbitMqMessageConsumerFactory MessageConsumerFactory { get; }
protected IRabbitMqMessageConsumer Consumer { get; private set; }
protected IRabbitMqMessageConsumer Consumer { get; private set; } = default!;
private bool _exchangeCreated;
@ -175,7 +175,7 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory &&
(factory as SingleInstanceHandlerFactory).HandlerInstance == handler
(factory as SingleInstanceHandlerFactory)!.HandlerInstance == handler
);
});
}
@ -282,9 +282,9 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe
public virtual Task PublishAsync(
Type eventType,
object eventData,
Dictionary<string, object> headersArguments = null,
Dictionary<string, object>? headersArguments = null,
Guid? eventId = null,
[CanBeNull] string correlationId = null)
string? correlationId = null)
{
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
var body = Serializer.Serialize(eventData);
@ -295,9 +295,9 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe
protected virtual Task PublishAsync(
string eventName,
byte[] body,
Dictionary<string, object> headersArguments = null,
Dictionary<string, object>? headersArguments = null,
Guid? eventId = null,
[CanBeNull] string correlationId = null)
string? correlationId = null)
{
using (var channel = ConnectionPool.Get(AbpRabbitMqEventBusOptions.ConnectionName).CreateModel())
{
@ -309,9 +309,9 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe
IModel channel,
string eventName,
byte[] body,
Dictionary<string, object> headersArguments = null,
Dictionary<string, object>? headersArguments = null,
Guid? eventId = null,
[CanBeNull] string correlationId = null)
string? correlationId = null)
{
EnsureExchangeExists(channel);
@ -366,7 +366,7 @@ public class RabbitMqDistributedEventBus : DistributedEventBusBase, ISingletonDe
_exchangeCreated = true;
}
private void SetEventMessageHeaders(IBasicProperties properties, Dictionary<string, object> headersArguments)
private void SetEventMessageHeaders(IBasicProperties properties, Dictionary<string, object>? headersArguments)
{
if (headersArguments == null)
{

@ -5,6 +5,8 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;net7.0</TargetFrameworks>
<Nullable>enable</Nullable>
<WarningsAsErrors>Nullable</WarningsAsErrors>
<AssemblyName>Volo.Abp.EventBus.Rebus</AssemblyName>
<PackageId>Volo.Abp.EventBus.Rebus</PackageId>
<AssetTargetFallback>$(AssetTargetFallback);portable-net45+win8+wp8+wpa81;</AssetTargetFallback>

@ -11,7 +11,7 @@ namespace Volo.Abp.EventBus.Rebus;
public class AbpRebusEventBusOptions
{
[NotNull]
public string InputQueueName { get; set; }
public string InputQueueName { get; set; } = default!;
[NotNull]
public Action<RebusConfigurer> Configurer {
@ -20,7 +20,7 @@ public class AbpRebusEventBusOptions
}
private Action<RebusConfigurer> _configurer;
public Func<IBus, Type, object, Task> Publish { get; set; }
public Func<IBus, Type, object, Task>? Publish { get; set; }
public AbpRebusEventBusOptions()
{

@ -165,7 +165,7 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
var headers = new Dictionary<string, string>();
if (CorrelationIdProvider.Get() != null)
{
headers.Add(EventBusConsts.CorrelationIdHeaderName, CorrelationIdProvider.Get());
headers.Add(EventBusConsts.CorrelationIdHeaderName, CorrelationIdProvider.Get()!);
}
await PublishAsync(eventType, eventData, headersArguments: headers);
}
@ -174,7 +174,7 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
Type eventType,
object eventData,
Guid? eventId = null,
Dictionary<string, string> headersArguments = null)
Dictionary<string, string>? headersArguments = null)
{
if (AbpRebusEventBusOptions.Publish != null)
{
@ -250,7 +250,7 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
OutgoingEventInfo outgoingEvent,
OutboxConfig outboxConfig)
{
var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName);
var eventType = EventTypes.GetOrDefault(outgoingEvent.EventName)!;
var eventData = Serializer.Deserialize(outgoingEvent.EventData, eventType);
using (CorrelationIdProvider.Change(outgoingEvent.GetCorrelationId()))
@ -265,7 +265,7 @@ public class RebusDistributedEventBus : DistributedEventBusBase, ISingletonDepen
var headers = new Dictionary<string, string>();
if (outgoingEvent.GetCorrelationId() != null)
{
headers.Add(EventBusConsts.CorrelationIdHeaderName, outgoingEvent.GetCorrelationId());
headers.Add(EventBusConsts.CorrelationIdHeaderName, outgoingEvent.GetCorrelationId()!);
}
await PublishAsync(eventType, eventData, eventId: outgoingEvent.Id, headersArguments: headers);

@ -14,6 +14,6 @@ public class RebusDistributedEventHandlerAdapter<TEventData> : IHandleMessages<T
public async Task Handle(TEventData message)
{
await RebusDistributedEventBus.ProcessEventAsync(message.GetType(), message);
await RebusDistributedEventBus.ProcessEventAsync(message!.GetType(), message);
}
}

@ -5,6 +5,8 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;net7.0</TargetFrameworks>
<Nullable>enable</Nullable>
<WarningsAsErrors>Nullable</WarningsAsErrors>
<AssemblyName>Volo.Abp.EventBus</AssemblyName>
<PackageId>Volo.Abp.EventBus</PackageId>
<AssetTargetFallback>$(AssetTargetFallback);portable-net45+win8+wp8+wpa81;</AssetTargetFallback>

@ -133,7 +133,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
Serialize(eventData),
Clock.Now
);
outgoingEventInfo.SetCorrelationId(CorrelationIdProvider.Get());
outgoingEventInfo.SetCorrelationId(CorrelationIdProvider.Get()!);
await eventOutbox.EnqueueAsync(outgoingEventInfo);
return true;
}
@ -148,11 +148,11 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
}
protected async Task<bool> AddToInboxAsync(
string messageId,
string? messageId,
string eventName,
Type eventType,
object eventData,
[CanBeNull] string correlationId)
string? correlationId)
{
if (AbpDistributedEventBusOptions.Inboxes.Count <= 0)
{
@ -170,7 +170,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
if (!messageId.IsNullOrEmpty())
{
if (await eventInbox.ExistsByMessageIdAsync(messageId))
if (await eventInbox.ExistsByMessageIdAsync(messageId!))
{
continue;
}
@ -178,12 +178,12 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
var incomingEventInfo = new IncomingEventInfo(
GuidGenerator.Create(),
messageId,
messageId!,
eventName,
Serialize(eventData),
Clock.Now
);
incomingEventInfo.SetCorrelationId(correlationId);
incomingEventInfo.SetCorrelationId(correlationId!);
await eventInbox.EnqueueAsync(incomingEventInfo);
}
}
@ -206,7 +206,7 @@ public abstract class DistributedEventBusBase : EventBusBase, IDistributedEventB
await TriggerHandlersAsync(eventType, eventData);
}
protected virtual async Task TriggerHandlersFromInboxAsync(Type eventType, object eventData, List<Exception> exceptions, InboxConfig inboxConfig = null)
protected virtual async Task TriggerHandlersFromInboxAsync(Type eventType, object eventData, List<Exception> exceptions, InboxConfig? inboxConfig = null)
{
await TriggerDistributedEventReceivedAsync(new DistributedEventReceived
{

@ -21,13 +21,13 @@ public class InboxProcessor : IInboxProcessor, ITransientDependency
protected IAbpDistributedLock DistributedLock { get; }
protected IUnitOfWorkManager UnitOfWorkManager { get; }
protected IClock Clock { get; }
protected IEventInbox Inbox { get; private set; }
protected InboxConfig InboxConfig { get; private set; }
protected IEventInbox Inbox { get; private set; } = default!;
protected InboxConfig InboxConfig { get; private set; } = default!;
protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; }
protected DateTime? LastCleanTime { get; set; }
protected string DistributedLockName { get; private set; }
protected string DistributedLockName { get; private set; } = default!;
public ILogger<InboxProcessor> Logger { get; set; }
protected CancellationTokenSource StoppingTokenSource { get; }
protected CancellationToken StoppingToken { get; }

@ -19,10 +19,10 @@ public class OutboxSender : IOutboxSender, ITransientDependency
protected AbpAsyncTimer Timer { get; }
protected IDistributedEventBus DistributedEventBus { get; }
protected IAbpDistributedLock DistributedLock { get; }
protected IEventOutbox Outbox { get; private set; }
protected OutboxConfig OutboxConfig { get; private set; }
protected IEventOutbox Outbox { get; private set; } = default!;
protected OutboxConfig OutboxConfig { get; private set; } = default!;
protected AbpEventBusBoxesOptions EventBusBoxesOptions { get; }
protected string DistributedLockName { get; private set; }
protected string DistributedLockName { get; private set; } = default!;
public ILogger<OutboxSender> Logger { get; set; }
protected CancellationTokenSource StoppingTokenSource { get; }

@ -131,7 +131,7 @@ public abstract class EventBusBase : IEventBus
}
}
protected virtual async Task TriggerHandlersAsync(Type eventType, object eventData, List<Exception> exceptions, InboxConfig inboxConfig = null)
protected virtual async Task TriggerHandlersAsync(Type eventType, object eventData, List<Exception> exceptions, InboxConfig? inboxConfig = null)
{
await new SynchronizationContextRemover();
@ -154,7 +154,7 @@ public abstract class EventBusBase : IEventBus
{
var baseEventType = eventType.GetGenericTypeDefinition().MakeGenericType(baseArg);
var constructorArgs = ((IEventDataWithInheritableGenericArgument)eventData).GetConstructorArgs();
var baseEventData = Activator.CreateInstance(baseEventType, constructorArgs);
var baseEventData = Activator.CreateInstance(baseEventType, constructorArgs)!;
await PublishToEventBusAsync(baseEventType, baseEventData);
}
}
@ -197,7 +197,7 @@ public abstract class EventBusBase : IEventBus
protected abstract IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType);
protected virtual async Task TriggerHandlerAsync(IEventHandlerFactory asyncHandlerFactory, Type eventType,
object eventData, List<Exception> exceptions, InboxConfig inboxConfig = null)
object eventData, List<Exception> exceptions, InboxConfig? inboxConfig = null)
{
using (var eventHandlerWrapper = asyncHandlerFactory.GetHandler())
{
@ -218,7 +218,7 @@ public abstract class EventBusBase : IEventBus
}
catch (TargetInvocationException ex)
{
exceptions.Add(ex.InnerException);
exceptions.Add(ex.InnerException!);
}
catch (Exception ex)
{

@ -6,9 +6,9 @@ public class EventHandlerDisposeWrapper : IEventHandlerDisposeWrapper
{
public IEventHandler EventHandler { get; }
private readonly Action _disposeAction;
private readonly Action? _disposeAction;
public EventHandlerDisposeWrapper(IEventHandler eventHandler, Action disposeAction = null)
public EventHandlerDisposeWrapper(IEventHandler eventHandler, Action? disposeAction = null)
{
_disposeAction = disposeAction;
EventHandler = eventHandler;

@ -23,12 +23,12 @@ public class EventHandlerInvoker : IEventHandlerInvoker, ISingletonDependency
if (typeof(ILocalEventHandler<>).MakeGenericType(eventType).IsInstanceOfType(eventHandler))
{
item.Local = (IEventHandlerMethodExecutor)Activator.CreateInstance(typeof(LocalEventHandlerMethodExecutor<>).MakeGenericType(eventType));
item.Local = (IEventHandlerMethodExecutor?)Activator.CreateInstance(typeof(LocalEventHandlerMethodExecutor<>).MakeGenericType(eventType));
}
if (typeof(IDistributedEventHandler<>).MakeGenericType(eventType).IsInstanceOfType(eventHandler))
{
item.Distributed = (IEventHandlerMethodExecutor)Activator.CreateInstance(typeof(DistributedEventHandlerMethodExecutor<>).MakeGenericType(eventType));
item.Distributed = (IEventHandlerMethodExecutor?)Activator.CreateInstance(typeof(DistributedEventHandlerMethodExecutor<>).MakeGenericType(eventType));
}
return item;

@ -2,7 +2,7 @@
public class EventHandlerInvokerCacheItem
{
public IEventHandlerMethodExecutor Local { get; set; }
public IEventHandlerMethodExecutor? Local { get; set; }
public IEventHandlerMethodExecutor Distributed { get; set; }
public IEventHandlerMethodExecutor? Distributed { get; set; }
}

@ -105,7 +105,7 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
factories.RemoveAll(
factory =>
factory is SingleInstanceHandlerFactory &&
(factory as SingleInstanceHandlerFactory).HandlerInstance == handler
((factory as SingleInstanceHandlerFactory)!).HandlerInstance == handler
);
});
}
@ -177,7 +177,7 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
}
// Internal for unit testing
internal Func<Type, object, Task> OnEventHandleInvoking { get; set; }
internal Func<Type, object, Task>? OnEventHandleInvoking { get; set; }
// Internal for unit testing
protected async override Task InvokeEventHandlerAsync(IEventHandler eventHandler, object eventData, Type eventType)
@ -191,7 +191,7 @@ public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
}
// Internal for unit testing
internal Func<Type, object, Task> OnPublishing { get; set; }
internal Func<Type, object, Task>? OnPublishing { get; set; }
// For unit testing
public async override Task PublishAsync(

@ -64,6 +64,6 @@ public class TransientEventHandlerFactory : IEventHandlerFactory
protected virtual IEventHandler CreateHandler()
{
return (IEventHandler)Activator.CreateInstance(HandlerType);
return (IEventHandler)Activator.CreateInstance(HandlerType)!;
}
}

@ -5,6 +5,8 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;net7.0</TargetFrameworks>
<Nullable>enable</Nullable>
<WarningsAsErrors>Nullable</WarningsAsErrors>
<RootNamespace />
</PropertyGroup>

@ -8,11 +8,11 @@ public class AbpKafkaOptions
{
public KafkaConnections Connections { get; }
public Action<ProducerConfig> ConfigureProducer { get; set; }
public Action<ProducerConfig>? ConfigureProducer { get; set; }
public Action<ConsumerConfig> ConfigureConsumer { get; set; }
public Action<ConsumerConfig>? ConfigureConsumer { get; set; }
public Action<TopicSpecification> ConfigureTopic { get; set; }
public Action<TopicSpecification>? ConfigureTopic { get; set; }
public AbpKafkaOptions()
{

@ -30,7 +30,7 @@ public class ConsumerPool : IConsumerPool, ISingletonDependency
Logger = new NullLogger<ConsumerPool>();
}
public virtual IConsumer<string, byte[]> Get(string groupId, string connectionName = null)
public virtual IConsumer<string, byte[]> Get(string groupId, string? connectionName = null)
{
connectionName ??= KafkaConnections.DefaultConnectionName;

@ -5,5 +5,5 @@ namespace Volo.Abp.Kafka;
public interface IConsumerPool : IDisposable
{
IConsumer<string, byte[]> Get(string groupId, string connectionName = null);
IConsumer<string, byte[]> Get(string groupId, string? connectionName = null);
}

@ -14,5 +14,5 @@ public interface IKafkaMessageConsumerFactory
IKafkaMessageConsumer Create(
string topicName,
string groupId,
string connectionName = null);
string? connectionName = null);
}

@ -5,5 +5,5 @@ namespace Volo.Abp.Kafka;
public interface IProducerPool : IDisposable
{
IProducer<string, byte[]> Get(string connectionName = null);
IProducer<string, byte[]> Get(string? connectionName = null);
}

@ -21,8 +21,10 @@ public class KafkaConnections : Dictionary<string, ClientConfig>
Default = new ClientConfig();
}
public ClientConfig GetOrDefault(string connectionName)
public ClientConfig GetOrDefault(string? connectionName)
{
connectionName ??= DefaultConnectionName;
if (TryGetValue(connectionName, out var connectionFactory))
{
return connectionFactory;

@ -31,13 +31,13 @@ public class KafkaMessageConsumer : IKafkaMessageConsumer, ITransientDependency,
protected ConcurrentBag<Func<Message<string, byte[]>, Task>> Callbacks { get; }
protected IConsumer<string, byte[]> Consumer { get; private set; }
protected IConsumer<string, byte[]>? Consumer { get; private set; }
protected string ConnectionName { get; private set; }
protected string? ConnectionName { get; private set; }
protected string GroupId { get; private set; }
protected string GroupId { get; private set; } = default!;
protected string TopicName { get; private set; }
protected string TopicName { get; private set; } = default!;
public KafkaMessageConsumer(
IConsumerPool consumerPool,
@ -63,7 +63,7 @@ public class KafkaMessageConsumer : IKafkaMessageConsumer, ITransientDependency,
public virtual void Initialize(
[NotNull] string topicName,
[NotNull] string groupId,
string connectionName = null)
string? connectionName = null)
{
Check.NotNull(topicName, nameof(topicName));
Check.NotNull(groupId, nameof(groupId));
@ -160,7 +160,7 @@ public class KafkaMessageConsumer : IKafkaMessageConsumer, ITransientDependency,
}
finally
{
Consumer.Commit(consumeResult);
Consumer?.Commit(consumeResult);
}
}

@ -16,7 +16,7 @@ public class KafkaMessageConsumerFactory : IKafkaMessageConsumerFactory, ISingle
public IKafkaMessageConsumer Create(
string topicName,
string groupId,
string connectionName = null)
string? connectionName = null)
{
var consumer = ServiceScope.ServiceProvider.GetRequiredService<KafkaMessageConsumer>();
consumer.Initialize(topicName, groupId, connectionName);

@ -32,7 +32,7 @@ public class ProducerPool : IProducerPool, ISingletonDependency
Logger = new NullLogger<ProducerPool>();
}
public virtual IProducer<string, byte[]> Get(string connectionName = null)
public virtual IProducer<string, byte[]> Get(string? connectionName = null)
{
connectionName ??= KafkaConnections.DefaultConnectionName;

Loading…
Cancel
Save