СоХабр закрыт.

С 13.05.2019 изменения постов больше не отслеживаются, и новые посты не сохраняются.

H ManualResetEventAsync в черновиках

.NET, C#
Возникла сегодня идея написать асинхронную версию ManualResetEvent, которая в задаче будет «ожидать» через await и при этом не занимать никакой поток.

В теории все просто, для стейтмашины нужен объект, который имеет метод GetAwaiter, который вернет awaiter, в котором реализован INotifyCompletion с OnCompleted, свойство IsCompleted и метод GetResult.

Awaiter:

    public class ManualResetEventAwaiter : INotifyCompletion
    {
        Action _continuation;
        bool _isCompleted = false;
        public void OnCompleted(Action continuation)
        {
            Volatile.Write(ref _continuation, continuation);
        }

        public bool IsCompleted => _isCompleted;

        public bool GetResult() => true;

        public ManualResetEventAwaiter GetAwaiter() => this;

        public void Continue()
        {
            Volatile.Write(ref _isCompleted, true);

            var continuation = Interlocked.Exchange(ref _continuation, null);

            if (continuation != null)
                Task.Run(continuation);
        }
    }

В OnCompleted стейтмашина передаст действие которое продолжит нашу логику выполнения. Для нас это воспринимается как логика после await.

В Continue будет вызываться это продолжение работы. Я обернул его в Task для того что бы «ожидающие таски» так же продолжились асинхронно.

ManualResetEventAsync:

    public class ManualResetEventAsync
    {
        ConcurrentQueue<ManualResetEventAwaiter> awaitQueue = null;

        public ManualResetEventAsync(bool initialState)
        {
            if (!initialState)
                Reset();
        }

        public ManualResetEventAwaiter WaitOneAsync()
        {
            var awaitable = new ManualResetEventAwaiter();

            var queue = Volatile.Read(ref awaitQueue);

            if (queue == null)
                awaitable.Continue();
            else
            {
                queue.Enqueue(awaitable);

                //check queue
                var upd_queue = Volatile.Read(ref awaitQueue);               

                if (!ReferenceEquals(queue, upd_queue))
                    awaitable.Continue();
            }

            return awaitable;
        }

        public void Set()
        {
            var queue = Interlocked.Exchange(ref awaitQueue, null);

            if (queue != null)
                while (queue.TryDequeue(out var awaitable))
                    awaitable.Continue();
        }

        public void Reset()
        {
            Interlocked.CompareExchange(
                ref awaitQueue,
                new ConcurrentQueue<ManualResetEventAwaiter>(),
                null);
        }
    }

Метод WaitOneAsync будет использоваться с await.

Set вызовет все продолжения и установит сигнальное состояние.

Reset установит не сигнальное состояние.

UPD: Упростил немного код.

Ссылка на проект
+2
~1600

комментарии (24)

0
+1 –1
Lelushak ,  
1. Писать такое нужно как extension к WaitHandle, т.к. благодаря этому поддержка асинхронного ожидания появится сразу у:

WaitHandle
AutoResetEvent
EventWaitHandle
ManualResetEvent
Mutex
Semaphore

И других его наследников.

2. Async and cancellation support for wait handles
–1
Ascar ,  
Цель была написать это именно самому. Мне вообще сейчас кажется не правильно мешать асинхронную модель с методами синхронизации потоков, все таки это разное. Название класса только пока оставлю схожее.
0
a-tk ,  
Тогда уже корректнее сказать, что цель — потренироваться писать такие вещи. Авось пригодится когда-нибудь навык.
0
Ascar ,  
Посмотрел исходники проекта, там сделано через TaskCompletionSource и ThreadPool.RegisterWaitForSingleObject который использует WaitHandle. Я же вообще не лезу в ThreadPool.
+2
mayorovp ,  

Вот только WaitHandle — это всегда объект ядра, а их использования зачастую хочется избежать.

0
Lelushak ,  
Можете привести примеры? На ум приходит только требование кроссплатформенности. Гораздо чаще хочется избежать оверхеда по памяти (но тут ваша реализация снизу спасает автора) и усложнения кода: придется переписывать все примитивы синхронизации и вообще сущности множатся

Если же говорить о разработке под Windows, то работа с WaitHandle позволит не писать обертки для работы со сторонними либами и использовать удобные конструкции типа cancellationToken.WaitHandle.WaitOneAsync()
+2
mayorovp ,   * (был изменён)

Любое взаимодействие с объектом ядра — это системный вызов, а системные вызовы нынче дороги. Конечно же, совсем без них не обойтись, но делать системный вызов для выполнения операции Set многими уже считается перебором.


Если же говорить о разработке под Windows, то работа с WaitHandle позволит не писать обертки для работы со сторонними либами

Это отдельная задача. Да, если у нас уже есть WaitHandle — то решение по вашей ссылке будет оптимальным. А вот при написании своего кода пригодится управляемая реализация.


и использовать удобные конструкции типа cancellationToken.WaitHandle.WaitOneAsync()

Обманчиво удобная конструкция: она создает лишний объект ядра, делая аж четыре системных вызова в том месте, где их можно вовсе избежать!


Вот так делать правильнее:


var tsc = new TaskCompletionSource<bool>();
using (cancellationToken.Register(() => tsc.SetResult(false))
    await tsc.Task;

Разумеется, такой код можно вынести в отдельный метод.

0
Lelushak ,  
Вот так делать правильнее


Тогда уж

var tsc = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);


Так как ваш вариант потенциально может привести к дедлоку из-за преобразования оставшейся части кода в синхронное «продолжение» таски
0
mayorovp ,  

Не очень понимаю где конкретно там может быть дедлок, но исправление нормальное. Тогда уж надо и ConfigureAwait добавить (при условии что это все вынесено в отдельный метод).

0
Lelushak ,   * (был изменён)
Синтетический пример с дедлоком:

var source = new CancellationTokenSource();
var cancellationToken = source.Token;
source.CancelAfter(2 * 1000);
bool cleaned = false;

cancellationToken.Register(() =>
{
	//some work
	cleaned = true;
});

var tsc = new TaskCompletionSource<bool>();
using (cancellationToken.Register(() => tsc.SetResult(false)))
	await tsc.Task;

while (true)
{
	if (!cleaned)
	{
		Thread.Sleep(100); //some synchronous work emulation
	}
	else break;
}

Console.WriteLine("Finished"); //never reach this


После await tsc.Task последующий код метода продолжит выполняться в том же потоке, второй делегат с cleaned=true не начнет выполняться до тех пор, пока не закончится синхронный код либо пока не вызовется какая-нибудь другая асинхронная функция (например await Task.Delay(1))
+1
mayorovp ,  

Ну, блок кода вида "синхронная работа в бесконечном цикле" в асинхронной программе — сам по себе ошибка, он много где ещё может выстрелить.


С тем же успехом можно было написать вот так и жаловаться на дедлоки:


cancellationToken.Register(() =>
{
    while (!cleaned)
    {
        Thread.Sleep(100); //some synchronous work emulation
    }
});
–1
Lelushak ,   * (был изменён)
Ну так дедлоки не от хорошего кода появляются :)

Ну и это все же другое. Код из моего примера плохой, но в общем-то легальный: в самих делегатах бесконечной работы нет и интуитивно ожидается, что они быстренько отработают друг за другом (так бы и произошло без await). Ломается он только из-за специфичных особенностей разворачивания await'ов компилятором и найти такую ошибку в сколько-нибудь большой системе будет сложнее, чем бесконечный цикл внутри делегата, особенно если не знать где искать. Так что лучше предохраниться где это возможно и возможно кто-нибудь из читателей этой ветки в будущем не выстрелит себе в ногу
+1
mayorovp ,  

У вас гонка между методами Set и WaitOneAsync.


Метод WaitOneAsync может положить awaitable в очередь уже в тот момент, когда метод Set закончил работу и ничего из очереди не читает...


Нужно, во-первых, сделать State volatile — а во вторых, проверять его уже после выполнения метода Enqueue.

–1
Ascar ,   * (был изменён)
State свойство. Когда Set закончит работу и State будет false только тогда можно в очередь добавлять, иначе возвращается завершенный awaiter. А если State внезапно станет true до завершения Set, то чтение TryDequeue прекратится.
0
mayorovp ,  

Вот только запросто может оказаться, что метод Set в одном потоке уже отработал, а State в другом потоке все ещё видно как false. Ибо кеш, реордеринг инструкций и прочее...

0
Ascar ,  
Вы предлагаете в get у State задать поле votatile?
0
mayorovp ,   * (был изменён)

Не очень понимаю при чем тут get, но да, State при вашем подходе должен быть volatile. Как раз это я и написал после слов "во-первых".

0
Ascar ,  
При том что свойство не делается volatile.
0
mayorovp ,  

А с каких пор в C# запретили обычную реализацию свойств и оставили только автосвойства?


И зачем вообще классу ManualResetEventAsync нужно открытое свойство State, которое невозможно безопасно использовать ни для чего кроме диагностики?

0
Ascar ,  
Вообщем выпилил вообще State.
+2
mayorovp ,  

Но, вообще говоря, вся логика, которую вы делали в ManualResetEventAwaiter, уже есть в TaskCompletionSource. Можно его и использовать. Заодно получится от очереди избавиться, ведь TaskCompletionSource поддерживает множественное ожидание...


public class ManualResetEventAsync
{
    private TaskCompletionSource<bool> tcs = null;

    public Task WaitOneAsync() 
    {
        var tcs = Volatile.Read(ref this.tcs);
        if (tcs == null)
            return Task.CompletedTask;
        else
            return tcs.Task;
    }

    public void Set()
    {
        var old_tcs = Interlocked.Exchange(ref this.tcs, null);
        old_tcs?.SetResult(false);
    }

    public void Reset()
    {
        var new_tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
        Interlocked.CompareExchange(ref this.tcs, new_tcs, null);
    }
}
0
Ascar ,  
Ага, вариант работает, еще конструктор с сигнальным состоянием добавить.
0
AlexSys ,  
У Stephen Cleary есть AsyncManualResetEvent.
0
Ascar ,  
Как я уже и писал.
Посмотрите исходники, там идет обращение к потокам системы и к WaitHandle. В асинхронной модели это совсем не нужно.