System.Reactive 6.0.0-preview.16
Rx (Reactive Extensions for .NET)
Rx enables event-driven programming with a composable, declarative model.
Getting started
Run the following at a command line:
mkdir TryRx
cd TryRx
dotnet new console
dotnet add package System.Reactive
Alternatively, if you have Visual Studio installed, create a new .NET Console project, and then use the NuGet package manager to add a reference to System.Reactive
.
You can then add this code to your Program.cs
. This creates an observable source (ticks
) that produces an event once every second. It also adds a handler to that source that writes a message to the console for each event:
using System.Reactive.Linq;
IObservable<long> ticks = Observable.Timer(
dueTime: TimeSpan.Zero,
period: TimeSpan.FromSeconds(1));
ticks.Subscribe(
tick => Console.WriteLine($"Tick {tick}"));
Console.ReadLine();
Examples
Wrapping an existing event source as an Rx IObservable<T>
If you have an existing source of events that does not support Rx directly, but which does offer .NET events, you can bring this into the world of Rx using the Observable.FromEventPattern
method:
using System.Reactive.Linq;
FileSystemWatcher fsw = new FileSystemWatcher(@"C:\temp");
IObservable<FileSystemEventArgs> changeEvents = Observable
.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
h => fsw.Changed += h,
h => fsw.Changed -= h)
.Select(e => e.EventArgs);
fsw.EnableRaisingEvents = true;
Waiting for inactivity
It can sometimes be useful to wait for a period of inactivity before taking action. For example, if you have code that monitors a directory in a filesystem, processing modified or added files, it's common for there to be flurries of activity. For example, if a user is copying multiple files into a folder that you're observing, there will be multiple changes, and it could be more efficient to wait until those stop and then process all the changes in one batch, than to attempt to process everything immediately.
This example defines a custom Rx operator that can be attached to any source. It will wait for that source to start producing events, and then, it will wait for it to stop again for the specified period. Each time that happens, it reports all of the activity that occurred between the last two periods of inactivity:
static class RxExt
{
public static IObservable<IList<T>> Quiescent<T>(
this IObservable<T> src,
TimeSpan minimumInactivityPeriod,
IScheduler scheduler)
{
IObservable<int> onoffs =
from _ in src
from delta in Observable.Return(1, scheduler).Concat(Observable.Return(-1, scheduler).Delay(minimumInactivityPeriod, scheduler))
select delta;
IObservable<int> outstanding = onoffs.Scan(0, (total, delta) => total + delta);
IObservable<int> zeroCrossings = outstanding.Where(total => total == 0);
return src.Buffer(zeroCrossings);
}
}
(This works by creating a sequence (onoffs
) that produces a value 1 each time activity occurs, and then a corresponding -1 after the specified time has elapsed. It then uses Scan
to produce the outstanding
sequence, which is just a running total of those onoffs
. This is effectively a count of the number of events that have happened recently (where 'recently' is defined as 'less than minimumInactivityPeriod
ago). Every new event that occurs raises this running total by 1, but each time the specified timespan has passed for a particular event, it drops by one. So when this drops back to 0, it means that there are no events that have occurred as recently as the minimumInactivityPeriod
. The zeroCrossings
sequence picks out just the events in which outstanding
drops back to zero. This has the effect that zeroCrossings
raises an event every time there has been some activity followed by minimumInactivityPeriod
of inactivity. Finally, we plug this into the Buffer
operator, which slices the input events (src
) into chunks. By passing it the zeroCrossings
source, we tell Buffer
to deliver a new slice every time the source becomes inactive. The effect is that the source returned by Quiescent
does nothing until there has been some activity followed by the specified period of inactivity, at which point it produces a single event reporting all of the source events that have occurred since the previous period, or in the initial case, all of the source events so far.)
You could use this in conjunction with the adapted FileSystemWatcher
from the preceding example:
IObservable<IList<FileSystemEventArgs>> fileActivityStopped = changeEvents
.Quiescent(TimeSpan.FromSeconds(2), Scheduler.Default);
await fileActivityStopped.ForEachAsync(
a => Console.WriteLine($"File changes stopped after {a.Count} changes"));
(Note: this only uses the Changed
event. A real application might also need to look at the FileSystemWatcher
's Created
, Renamed
, and Deleted
events.)
Feedback
You can create issues at the https://github.com/dotnet/reactive repository
Showing the top 20 packages that depend on System.Reactive.
Packages | Downloads |
---|---|
System.Reactive.Windows.Threading
Reactive Extensions (Rx) for .NET - v3 compatibility facade for
|
40 |
System.Reactive.Core
Reactive Extensions (Rx) for .NET - v3 compatibility facade for
|
39 |
System.Reactive.Linq
Reactive Extensions (Rx) for .NET - v3 compatibility facade for
|
34 |
System.Reactive.PlatformServices
Reactive Extensions (Rx) for .NET - v3 compatibility facade for
|
33 |
System.Reactive.Interfaces
Reactive Extensions (Rx) for .NET - v3 compatibility facade for
|
33 |
System.Reactive.Interfaces
Reactive Extensions (Rx) for .NET - v3 compatibility facade for
|
31 |
System.Reactive.PlatformServices
Reactive Extensions (Rx) for .NET - v3 compatibility facade for
|
30 |
System.Reactive.Interfaces
Reactive Extensions (Rx) for .NET - v3 compatibility facade for
|
29 |
System.Reactive.PlatformServices
Legacy facade for Reactive Extensions (Rx) for .NET
|
29 |
System.Reactive.WindowsRuntime
Reactive Extensions (Rx) for .NET - v3 compatibility facade for
|
29 |
System.Reactive.Core
Reactive Extensions (Rx) for .NET - v3 compatibility facade for
|
29 |
Discord.Net.Interactions
A Discord.Net extension adding support for Application Commands.
|
28 |
System.Reactive.WindowsRuntime
Legacy facade for Reactive Extensions (Rx) for .NET
|
28 |
System.Reactive.Windows.Threading
Reactive Extensions (Rx) for .NET - v3 compatibility facade for
|
28 |
System.Reactive.Core
Reactive Extensions (Rx) for .NET - v3 compatibility facade for
|
27 |
System.Reactive.Windows.Threading
Reactive Extensions (Rx) for .NET - v3 compatibility facade for
|
27 |
.NET Framework 4.7.2
- System.Threading.Tasks.Extensions (>= 4.5.4)
.NET 6.0
- No dependencies.
.NET 6.0
- No dependencies.
.NET Standard 2.0
- System.Threading.Tasks.Extensions (>= 4.5.4)
UAP 10.0.18362
- System.Threading.Tasks.Extensions (>= 4.5.4)
Version | Downloads | Last updated |
---|---|---|
6.0.1 | 5 | 06/08/2024 |
6.0.1-preview.1 | 43 | 08/08/2023 |
6.0.0 | 12 | 06/13/2023 |
6.0.0-preview.16 | 12 | 08/25/2023 |
6.0.0-preview.13 | 28 | 06/07/2023 |
6.0.0-preview.9 | 13 | 02/10/2024 |
6.0.0-preview.1 | 30 | 07/30/2023 |
5.0.0 | 42 | 02/22/2023 |
5.0.0-preview.220 | 17 | 07/12/2023 |
5.0.0-preview.16 | 24 | 08/27/2023 |
4.4.1 | 22 | 06/14/2023 |
4.3.2 | 14 | 06/13/2023 |
4.3.1 | 24 | 06/14/2023 |
4.2.2 | 19 | 07/12/2023 |
4.2.0 | 15 | 06/14/2023 |
4.2.0-preview.625 | 25 | 07/31/2023 |
4.2.0-preview.566 | 38 | 07/14/2023 |
4.2.0-preview.102 | 7 | 08/26/2023 |
4.2.0-preview.63 | 26 | 08/02/2023 |
4.1.6 | 17 | 07/30/2023 |
4.1.5 | 6 | 06/14/2023 |
4.1.4 | 18 | 08/30/2023 |
4.1.3 | 25 | 06/13/2023 |
4.1.2 | 15 | 06/13/2023 |
4.1.1 | 13 | 06/13/2023 |
4.1.0 | 12 | 06/13/2023 |
4.1.0-preview.330 | 12 | 08/26/2023 |
4.1.0-preview.84 | 10 | 08/29/2023 |
4.0.0 | 21 | 06/13/2023 |
4.0.0-preview.4.build.391 | 16 | 08/02/2023 |
4.0.0-preview.3.build.380 | 16 | 08/28/2023 |
4.0.0-preview.2.build.379 | 14 | 08/26/2023 |
3.1.1 | 19 | 06/13/2023 |
3.1.0 | 15 | 08/09/2023 |
3.1.0-rc | 24 | 05/20/2023 |
3.0.0 | 27 | 08/06/2023 |