Asynchronní volání pro Entity Framework a AsEnumerable

Tomáš Holan       13.07.2015       Entity Framework, LINQ, Threading       11037 zobrazení

Entity framework sám o sobě již nějakou dobu podporuje asynchronní volání. Při načítání dat pomoci entity frameworku je ale dost často využívaná metoda AsEnumerable, pomoci které můžeme části dotazu, které nelze provést přímo v SQL, nechat v druhém kroku vyhodnotit až na klientu. Takovýto dotaz ale standardně asynchronně volat nelze. Jak na to, si ukážeme v tomto článku.

Synchronní verze

Mějme následující dvě metody využívající pro načítání dat Entity Framework v kombinaci s použitím metody AsEnumerable:

public static class UzivateleService
{
    #region action methods
    public static IList<Models.Uzivatel> GetUzivatele(bool zahrnoutVyrazene)
    {
        using (var context = new DataContext())
        {
            var uzivatele = GetUzivatele(context.Uzivatele.Where(u => u.uziPlatnostDo == null || zahrnoutVyrazene)).ToList();
            return uzivatele;
        }
    }

    public static Models.Uzivatel GetUzivatel(int IDUzivatele)
    {
        using (var context = new DataContext())
        {
            return GetUzivatele(context.Uzivatele.Where(u => u.uziIDUzivatele == IDUzivatele)).FirstOrDefault();
        }
    }
    #endregion

    #region private member functions
    private static IEnumerable<Models.Uzivatel> GetUzivatele(IQueryable<Uzivatel> source)
    {
        var uzivatele = from result in
                            (from u in source
                            orderby u.uziPrijmeni, u.uziJmeno
                            select new
                            {
                                IDUzivatele = u.uziIDUzivatele,
                                Zapsano = u.uziZapsano,
                                Jmeno = u.uziJmeno,
                                Prijmeni = u.uziPrijmeni,
                                UzivatelLogin = u.UzivatelLogin,
                                Prihlaseni = (from prihlaseni in u.UzivatelPrihlaseni
                                                orderby prihlaseni.up_PosledniPrihlaseni descending
                                                select prihlaseni).FirstOrDefault(),
                                Vyrazen = u.uziPlatnostDo != null
                            }).AsEnumerable()
                        select new Models.Uzivatel()
                        {
                            IDUzivatele = result.IDUzivatele,
                            Zapsano = result.Zapsano,
                            Jmeno = result.Jmeno,
                            Prijmeni = result.Prijmeni,
                            PrihlasovaciJmena = string.Join(", ", result.UzivatelLogin.Select(o => o.ul_LoginName)),
                            PosledniPrihlaseni = result.Prihlaseni == null ? (DateTime?)null : result.Prihlaseni.up_PosledniPrihlaseni,
                            PosledniPrihlaseniPocitac = result.Prihlaseni == null ? null : result.Prihlaseni.up_PosledniPrihlaseniPocitac,
                            PosledniPrihlaseniOS = result.Prihlaseni == null ? null : result.Prihlaseni.up_PosledniPrihlaseniOS,
                            Vyrazen = result.Vyrazen
                        };

        return uzivatele;
    }
    #endregion
}

Kód je vcelku jednoduchý a jasný, obě metody GetUzivatele i GetUzivatel volají pomocnou metodu, která obsahuje logiku pro transformaci entity na model. Třídu entity ani modelu zde neuvádím, protože to jak vypadají je z příkladu zřejmé. Pomocná privátní metoda GetUzivatele používá v EF LINQ dotazu metodu AsEnumerable z toho důvodu, aby volání string.Join() proběhlo až na klientu po načtení dat z SQL Serveru (protože toto volání nelze přeložit a provést přímo součástí SQL dotazu).

Problém

Pokud tento kód budeme chtít převést na asynchronní verzi, narazíme. Pro asynchronní volání se v EF používají QueryableExtensions z namespace System.Data.Entity. Ty poskytují extension metody jako ToListAsync, FirstAsync, FirstOrDefaultAsync a další, které ve standardním případě pouze použijeme místo původních synchronních verzí a je hotovo. V našem případě by jsme tedy konkrétně chtěli použít v metodě GetUzivatele volání metody ToListAsync a v metodě GetUzivatel volání metody FirstOrDefaultAsync.

public static async Task<IList<Models.Uzivatel>> GetUzivatele(bool zahrnoutVyrazene, CancellationToken cancellationToken)
{
    using (var context = new DataContext())
    {
        var uzivatele = await GetUzivatele(context.Uzivatele.Where(u => u.uziPlatnostDo == null || zahrnoutVyrazene)).ToListAsync(cancellationToken);
        return uzivatele;
    }
}

public static async Task<Models.Uzivatel> GetUzivatel(int IDUzivatele, CancellationToken cancellationToken)
{
    using (var context = new DataContext())
    {
        return await GetUzivatele(context.Uzivatele.Where(u => u.uziIDUzivatele == IDUzivatele)).FirstOrDefaultAsync(cancellationToken);
    }
}

Tyto asynchronní metody jsou dostupné na rozhraní IQueryable<TSource>, kterým je zpravidla dotaz v EF reprezentován. V našem případě je ale po volaní AsEnumerable objekt reprezentující náš dotaz již typu IEnumerable<T>, kde tyto metody samozřejmě nejsou (a ani principiálně být nemohou, protože operace nad IEnumerable již probíhají v paměti dlouho potom, co vygenerovaný SQL dotaz již proběhl).

Zavedení IDbAsyncEnumerableExtensions

Co tedy s tím? V zásadě co tady potřebujeme je reprezentovat náš dotaz nějakým novým objektem jiného typu, na kterém ale bude také dostupná metoda ToListAsync (pro první případ) a FirstOrDefaultAsync (pro druhý případ). Tento objekt by měl jít získat voláním nové metody As<něco> na původním queryable, která se bude používat místo původní metody AsEnumerable u synchronní verze (tj. touto metodou řekneme, že chceme dále v LINQ dotazu pracovat již jen s “asynchronním” enumerable). Dále na nový objekt musíme umět aplikovat projekci (tj. bude muset implementovat metodu Select), přičemž objekt bude muset “vědět”, že při spuštění dotazu některou z asynchronních metod, má nejprve vykonat původní SQL dotaz a pak při enumeraci výsledků (v našem případě v anonymním typu) navíc již v paměti provádět definovanou projekci. Co tedy musíme udělat je přesně všechno toto.

Typ, kterým budeme reprezentovat asynchronní dotaz s možnou projekcí na klientu, bude existující typ z EF IDbAsyncEnumerable<TSource> z namespace System.Data.Entity.Infrastructure. Tento typ je totiž v EF interně použit právě pro asynchronní spouštění queryable dotazů a umožňuje projít výsledky dotazu pomoci “asynchronní verze” enumerátor kontraktu.

Toto si lze demonstrovat například kódem (kde query je libovolný EF dotaz typu IQueryable<T>):

var query = /*EF LINQ query*/

var source = (IDbAsyncEnumerable<T>)query;

using (var enumerator = source.GetAsyncEnumerator())
{
    while (true)
    {
        if (!await enumerator.MoveNextAsync(cancellationToken))
        {
            break;
        }

        //Use enumerator.Current
    }
}

K tomuto typu IDbAsyncEnumerable<TSource> naimplementujeme, jako vlastní extension metody, metody Select, ToListAsync, FirstAsync, FirstOrDefaultAsync a dále k typu IQueryable<T> naimplementujeme extension metodu AsDbAsyncEnumerable<T>.

internal static class DbAsyncEnumerableExtensions
{
    #region member types declarations
    private sealed class SelectDbAsyncEnumerable<TSource, TTarget> : IDbAsyncEnumerable<TTarget>
    {
        #region member types declarations
        private struct SelectDbAsyncEnumerator : IDbAsyncEnumerator<TTarget>
        {
            #region member varible and default property initialization
            private readonly IDbAsyncEnumerator<TSource> Source;
            private readonly Func<TSource, TTarget> Selector;
            #endregion

            #region constructors and destructors
            public SelectDbAsyncEnumerator(IDbAsyncEnumerator<TSource> source, Func<TSource, TTarget> selector)
            {
                this.Source = source;
                this.Selector = selector;
            }
            #endregion

            #region action methods
            public Task<bool> MoveNextAsync(CancellationToken cancellationToken)
            {
                return this.Source.MoveNextAsync(cancellationToken);
            }

            public void Dispose()
            {
                this.Source.Dispose();
            }
            #endregion

            #region property getters/setters
            public TTarget Current
            {
                get { return this.Selector(this.Source.Current); }
            }

            object IDbAsyncEnumerator.Current
            {
                get { return this.Current; }
            }
            #endregion
        }
        #endregion

        #region member varible and default property initialization
        private readonly IDbAsyncEnumerable<TSource> Source;
        private readonly Func<TSource, TTarget> Selector;
        #endregion

        #region constructors and destructors
        public SelectDbAsyncEnumerable(IDbAsyncEnumerable<TSource> source, Func<TSource, TTarget> selector)
        {
            this.Source = source;
            this.Selector = selector;
        }
        #endregion

        #region action methods
        public IDbAsyncEnumerator<TTarget> GetAsyncEnumerator()
        {
            return new SelectDbAsyncEnumerator(this.Source.GetAsyncEnumerator(), this.Selector);
        }

        IDbAsyncEnumerator IDbAsyncEnumerable.GetAsyncEnumerator()
        {
            return this.GetAsyncEnumerator();
        }
        #endregion
    }

    private struct QueryableWrapper<TSource> : IQueryable<TSource>, IDbAsyncEnumerable<TSource>
    {
        #region member varible and default property initialization
        private readonly IDbAsyncEnumerable<TSource> Source;
        #endregion

        #region constructors and destructors
        public QueryableWrapper(IDbAsyncEnumerable<TSource> source)
        {
            this.Source = source;
        }
        #endregion

        #region action methods
        public IDbAsyncEnumerator<TSource> GetAsyncEnumerator()
        {
            return this.Source.GetAsyncEnumerator();
        }

        IDbAsyncEnumerator IDbAsyncEnumerable.GetAsyncEnumerator()
        {
            return this.GetAsyncEnumerator();
        }

        public IEnumerator<TSource> GetEnumerator()
        {
            throw new NotSupportedException();
        }

        System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
        {
            return this.GetEnumerator();
        }
        #endregion

        #region property getters/setters
        public Type ElementType
        {
            get { return typeof(TSource); }
        }

        public System.Linq.Expressions.Expression Expression
        {
            get { throw new NotSupportedException(); }
        }

        public IQueryProvider Provider
        {
            get { throw new NotSupportedException(); }
        }
        #endregion
    }
    #endregion

    #region action methods
    public static IDbAsyncEnumerable<TTarget> Select<TSource, TTarget>(this IDbAsyncEnumerable<TSource> source, Func<TSource, TTarget> selector)
    {
        if (source == null)
        {
            throw new ArgumentNullException(nameof(source));
        }
        if (selector == null)
        {
            throw new ArgumentNullException(nameof(selector));
        }

        return new SelectDbAsyncEnumerable<TSource, TTarget>(source, selector);
    }

    public static Task<List<TSource>> ToListAsync<TSource>(this IDbAsyncEnumerable<TSource> source)
    {
        if (source == null)
        {
            throw new ArgumentNullException(nameof(source));
        }

        return GetQueryableWrapper(source).ToListAsync();
    }

    public static Task<List<TSource>> ToListAsync<TSource>(this IDbAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
    {
        if (source == null)
        {
            throw new ArgumentNullException(nameof(source));
        }

        return GetQueryableWrapper(source).ToListAsync(cancellationToken);
    }

    public static Task<TSource> FirstAsync<TSource>(this IDbAsyncEnumerable<TSource> source)
    {
        return FirstAsync(source, CancellationToken.None);
    }

    public static async Task<TSource> FirstAsync<TSource>(this IDbAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
    {
        if (source == null)
        {
            throw new ArgumentNullException(nameof(source));
        }

        cancellationToken.ThrowIfCancellationRequested();

        using (var enumerator = source.GetAsyncEnumerator())
        {
            if (await enumerator.MoveNextAsync(cancellationToken))
            {
                return enumerator.Current;
            }
        }

        throw new InvalidOperationException("Sequence contains no elements");
    }

    public static Task<TSource> FirstOrDefaultAsync<TSource>(this IDbAsyncEnumerable<TSource> source)
    {
        return FirstOrDefaultAsync(source, CancellationToken.None);
    }

    public static async Task<TSource> FirstOrDefaultAsync<TSource>(this IDbAsyncEnumerable<TSource> source, CancellationToken cancellationToken)
    {
        if (source == null)
        {
            throw new ArgumentNullException(nameof(source));
        }

        cancellationToken.ThrowIfCancellationRequested();

        using (var enumerator = source.GetAsyncEnumerator())
        {
            if (await enumerator.MoveNextAsync(cancellationToken))
            {
                return enumerator.Current;
            }
        }

        return default(TSource);
    }

    public static IDbAsyncEnumerable<T> AsDbAsyncEnumerable<T>(this IQueryable<T> source)
    {
        if (source == null)
        {
            throw new ArgumentNullException(nameof(source));
        }

        var enumerable = source as IDbAsyncEnumerable<T>;
        if (enumerable == null)
        {
            throw new InvalidOperationException(string.Format("The source IQueryable doesn't implement IDbAsyncEnumerable{0}. Only sources that implement IDbAsyncEnumerable can be used for Entity Framework asynchronous operations. For more details see http://go.microsoft.com/fwlink/?LinkId=287068.", "<" + typeof(T) + ">"));
        }
        return enumerable;
    }
    #endregion

    #region private member functions
    private static IQueryable<TSource> GetQueryableWrapper<TSource>(IDbAsyncEnumerable<TSource> source)
    {
        return new QueryableWrapper<TSource>(source);
    }
    #endregion
}

Implementaci nebudu popisovat úplně do detailu, ale upozorním na několik použitých “triků”.

  • V metodě AsDbAsyncEnumerable stačí původní dotaz na IDbAsyncEnumerable<T> pouze přetypovat, protože původní dotaz již rozhraní IDbAsyncEnumerable<T> v EF automaticky implementuje. Mimochodem tato metoda je přesně takto implementovaná i přímo v QueryableExtensions, tam je ale pouze private.
  • V metodě ToListAsync potřebujeme zavolat původní implementaci této metody z EF. Protože ta je ale deklarovaná na IQueryable<T>, pomůžeme si tak, že nad naším IDbAsyncEnumerable<T> uděláme vlastní wrapper, který implementuje jak IQueryable<T> tak IDbAsyncEnumerable<T>. Implementace členů z IQueryable<T> stačí přitom prázdná, protože tu původní metoda ToListAsync nepoužívá (a my víme, že pro nic jiného wrapper použit nebude). Implementace IDbAsyncEnumerable<T> je nutná z toho důvodu, protože původní metoda ToListAsync provádí přetypování aktuálního objektu na IDbAsyncEnumerable<T>. Tato implementace pak ale jen deleguje volání GetAsyncEnumerator na původní wrappovaný objekt.
  • Metoda FirstAsync (resp. FirstOrDefaultAsync) musí být implementována obdobně jako implementace v LINQ to objects (ve třídě Enumerable), ale “asynchronně” nad objektem IDbAsyncEnumerable<T>. Původní implementaci z EF zde použít nelze, protože ty fungují nad queryable a modifikují expression, který se překládá do SQL, což není to co my tady potřebujeme.

Výsledná asynchronní verze

S použitím naší pomocné třídy DbAsyncEnumerableExtensions můžeme nyní již jednoduše dokončit asynchronní verzi původního příkladu.

public static class UzivateleService
{
    #region action methods
    public static async Task<IList<Models.Uzivatel>> GetUzivatele(bool zahrnoutVyrazene, CancellationToken cancellationToken)
    {
        using (var context = new DataContext())
        {
            var uzivatele = await GetUzivatele(context.Uzivatele.Where(u => u.uziPlatnostDo == null || zahrnoutVyrazene)).ToListAsync(cancellationToken);
            return uzivatele;
        }
    }

    public static async Task<Models.Uzivatel> GetUzivatel(int IDUzivatele, CancellationToken cancellationToken)
    {
        using (var context = new DataContext())
        {
            return await GetUzivatele(context.Uzivatele.Where(u => u.uziIDUzivatele == IDUzivatele)).FirstOrDefaultAsync(cancellationToken);
        }
    }
    #endregion

    #region private member functions
    private static IDbAsyncEnumerable<Models.Uzivatel> GetUzivatele(IQueryable<Uzivatel> source)
    {
        var uzivatele = from result in
                            (from u in source
                            orderby u.uziPrijmeni, u.uziJmeno
                            select new
                            {
                                IDUzivatele = u.uziIDUzivatele,
                                Zapsano = u.uziZapsano,
                                Jmeno = u.uziJmeno,
                                Prijmeni = u.uziPrijmeni,
                                UzivatelLogin = u.UzivatelLogin,
                                Prihlaseni = (from prihlaseni in u.UzivatelPrihlaseni
                                                orderby prihlaseni.up_PosledniPrihlaseni descending
                                                select prihlaseni).FirstOrDefault(),
                                Vyrazen = u.uziPlatnostDo != null
                            }).AsDbAsyncEnumerable()
                        select new Models.Uzivatel()
                        {
                            IDUzivatele = result.IDUzivatele,
                            Zapsano = result.Zapsano,
                            Jmeno = result.Jmeno,
                            Prijmeni = result.Prijmeni,
                            PrihlasovaciJmena = string.Join(", ", result.UzivatelLogin.Select(o => o.ul_LoginName)),
                            PosledniPrihlaseni = result.Prihlaseni == null ? (DateTime?)null : result.Prihlaseni.up_PosledniPrihlaseni,
                            PosledniPrihlaseniPocitac = result.Prihlaseni == null ? null : result.Prihlaseni.up_PosledniPrihlaseniPocitac,
                            PosledniPrihlaseniOS = result.Prihlaseni == null ? null : result.Prihlaseni.up_PosledniPrihlaseniOS,
                            Vyrazen = result.Vyrazen
                        };

        return uzivatele;
    }
    #endregion
}

Závěrem pouze upozorním, že řešení tohoto problému jsem nikde v oficiálních ani neoficiálních zdrojích nenašel, a musel jsem ho kompletně vymyslet a implementovat sám. Z tohoto důvodu přivítám k mojí implementaci jakékoliv vaše podměty.

 

hodnocení článku

1 bodů / 1 hlasů       Hodnotit mohou jen registrované uživatelé.

 

Nový příspěvek

 

Příspěvky zaslané pod tento článek se neobjeví hned, ale až po schválení administrátorem.

Jazyk

Snad už nikdo neprogramuje v češtině, a ta je tak v kódu jen jen pro ilustraci. :-)

nahlásit spamnahlásit spam -1 / 1 odpovědětodpovědět

Vlastní implementace

Divím se, že to nikdo ještě neřešil.

nahlásit spamnahlásit spam -1 / 1 odpovědětodpovědět
                       
Nadpis:
Antispam: Komu se občas házejí perly?
Příspěvek bude publikován pod identitou   anonym.

Nyní zakládáte pod článkem nové diskusní vlákno.
Pokud chcete reagovat na jiný příspěvek, klikněte na tlačítko "Odpovědět" u některého diskusního příspěvku.

Nyní odpovídáte na příspěvek pod článkem. Nebo chcete raději založit nové vlákno?

 

  • Administrátoři si vyhrazují právo komentáře upravovat či mazat bez udání důvodu.
    Mazány budou zejména komentáře obsahující vulgarity nebo porušující pravidla publikování.
  • Pokud nejste zaregistrováni, Vaše IP adresa bude zveřejněna. Pokud s tímto nesouhlasíte, příspěvek neodesílejte.

Příspěvky zaslané pod tento článek se neobjeví hned, ale až po schválení administrátorem.

přihlásit pomocí externího účtu

přihlásit pomocí jména a hesla

Uživatel:
Heslo:

zapomenuté heslo

 

založit nový uživatelský účet

zaregistrujte se

 
zavřít

Nahlásit spam

Opravdu chcete tento příspěvek nahlásit pro porušování pravidel fóra?

Nahlásit Zrušit

Chyba

zavřít

feedback