This project is read-only.

async/await, IObservable, and yield return

Topics: C# Language Design
Apr 8, 2014 at 8:47 PM
This will likely be a complex feature, but I would like to have the ability to yield return a T in an async/await method using IObservable<T>

so for example:
public async IObservable<string> GetWebStuff(IEnumerable<string> addresses)
{
   var httpClient = new ...;

   foreach(var address in addresses)
   {
      var response = await httpClient.GetAsync(address);

      yield return await response.Content.ReadAsStringAsync();
   }
}
Since IObservable is in the BCL this should be easily supported, and not require a library like Rx to be present, though it could certainly work with it.
Apr 8, 2014 at 9:16 PM
I don't think I completely understand what you're looking for. What would yield return do to IObservable? yield return is meant to return to IEnumerable only, if I'm not mistaken. You'd at least need an iterative type.

Also, did you purposefully omit Task<T> from your method return type like this? You'd need it to make async make sense.
public async Task<IObservable<string>> GetWebStuff(IEnumerable<string> addresses)
{
   ...
}
Or are you talking about something like this? I've wanted this before as well, especially when reading Streams that contain arrays of data, but I suspect this would cause blocking somewhere along the line. And I'm not sure how this would be used in a real situation without having to fundamentally change some aspects of the way, for example, foreach works.
public async Task<IEnumerable<IObservable<string>>> GetWebStuff(IEnumerable<string> addresses)
{
   ...
}
Apr 8, 2014 at 9:35 PM
An IOservable<T> is effectively the combination of IEnumerable<T> and Task<T>. The Reactive Extensions has a good description for the differences.
Apr 8, 2014 at 10:22 PM
Edited Apr 8, 2014 at 11:00 PM
As I understand the original posting, the request is to support implicit creation of IObservable<T> in the same way like IEnumerable<T> through yield construct.

The code
public async IObservable<string> GetWebStuff(IEnumerable<string> addresses)
{
   var httpClient = new ...;

   foreach(var address in addresses)
   {
      var response = await httpClient.GetAsync(address);

      yield return await response.Content.ReadAsStringAsync();
   }
}
is going to be represented by something like
public IObservable<string> GetWebStuff(IEnumerable<string> addresses)
{
   var subject = new Subject<string>();
   Task.Run(async () =>
   {
       try
       {
           var httpClient = new ...;

           foreach(var address in addresses)
           {
              var response = await httpClient.GetAsync(address);
              subject.OnNext(await response.Content.ReadAsStringAsync());
           }

           subject.OnCompleted();
        }
        catch (Exception e)
        {
            subject.OnError(e);
        }
    });
    return subject.AsObservable();
}
Apr 9, 2014 at 12:12 PM
Edited Apr 9, 2014 at 5:29 PM
In my point of view, present IObservable<T> feel so Java. But because IObservable<T> has been baked into BCL, maybe better add new interfaces?
public interface IAsyncEnumerable<T>
{
    IAsyncEnumerator<T> GetAsyncEnumerator();
}

public interface IAsyncEnumerator<T> : IDisposable
{
    Task<bool> MoveNext();
    T Current { get; }
}
Then they should be able to be the return value of async methods:
async IAsyncEnumerable<string> GetAsyncForeachable()
{
    for (int i = 0; i < 10; i++)
    {
        var ret = await DownloadString1Async(i);
        ret += await DownloadString2Async(i);
        yield return ret;
    }
}
The above code should be similar to something like below today:
IObservable<string> GetObservable()
{
    return Observable.Create(async observer =>
    {
        try
        {
            for ( int i = 0; i < 10; i++ )
            {
                var ret = await DownloadString1Async(i);
                ret += await DownloadString2Async(i);
                observer.OnNext(ret);
            }

            observer.OnCompleted();
        }
        catch ( Exception e )
        {
            observer.OnError(e);
        }

        return Disposable.Empty;
    });
}
IAsyncEnumerable<T> can be consumed by foreach ( async .. ) inside async methods:
async Task Consuming( IAsyncEnumerable<T> xs )
{
    foreach ( async var x in xs )
    {
        Console.WriteLine(x);
    }

    // Which is equivalent to
    using ( var af = xs.GetAsyncEnumerator() )
    {
        while ( await af.MoveNext() )
        {
            Console.WriteLine(af.Current);
        }
    }
}
And of course can be used to compose new async enumerable objects:
static async IAsyncEnumerable<T> Where<T>( this IAsyncEnumerable<T> xs, Func<T, bool> pred )
{
    foreach ( async var x in xs )
    {
        if (pred(x))
        {
            yield return x;
        } 
    }
}
Apr 9, 2014 at 12:19 PM
Edited Apr 9, 2014 at 12:25 PM
@diryboy: Your proposal seems to be a lot of work on language side. Could you please elaborate more on real problems with IObservable<T>? (Just "Java feeling" doesn't explain much to me.) Why is it worth bothering?

Right now, async and enumerations are separate concepts, and IObservable<T> is kind of dual of IEnumerable<T>, whereas you propose some mix between the two.
Apr 9, 2014 at 12:25 PM
Edited Apr 9, 2014 at 6:14 PM
-- The + sign got replaced by &#43; automatically during editing in my last reply. --
Turned out to be codeplex bug in reply edit, refresh page looks correct, ignore this reply please.
Apr 9, 2014 at 9:14 PM
@diryboy You don't need that try-catch, Observable.Create() already calls OnError() by itself if the delegate throws.

The big advantage of async Task code is that it completely rewrites your code, performing transformation that's hard and error-prone to do by hand.

The advantage of async IObservable is what exactly? Saving a couple lines of boilerplate code?

(Don't get me wrong, avoiding boilerplate is a good thing, but this feature doesn't seem to be worth it to me.)
Apr 10, 2014 at 12:35 AM
Edited Apr 10, 2014 at 1:06 AM
That boilerplate is only limited because Reactive.NET does most of the work already. However, that is not in the BCL, and to write an IObservable<T> manually without Reactive.NET is not insignificant code. It doesn't require the compiler chicanery that writing an iterator does, that's for sure. However, it's my opinion that supporting both with a common syntax is a benefit for the sake of consistency.

Of course with generation I also think that syntax should be added to aid in consumption of IObservable<T> from within an async method. Since they feel a lot like an IEnumerable<T>, except asynchronous, I think something like "foreach" would make a lot of sense.
public async IObservable<int> GetValues()
{
    for (int i = 0; i < 10; I++)
    {
        await Task.Delay(1000);
        yield return i;
    }
}

public async Task<int> AddValues()
{
    IObservable<int> values = GetValues();
    int tally = 0;
    foreach (await int value in values)
    {
        tally += value;
    }
    return tally;
}
Bring Reactive.NET's query operators into play and you would have a fantastic framework for working with asynchronous "push" sequences of values.

I've submitted my own UserVoice suggestion for both features here:

Add Support for Async/Await with IObservable<T>
Apr 10, 2014 at 1:55 AM
By "Java feel", I mean the IObservable could look like:
public interface IMyObservable<T>
{
    event Action<T> Next;
    event Action Completed;
    event Action<Exception> Error;
}
Don't get me wrong, I do think the current IObservable and IObserver interfaces are the perfect dual of IEnumerable and IEnumerator.

BTW my IAsyncEnumerator should be disposable to support break inside foreach so I edited that.

I read this post by Lucian, he mentioned that early prototype of async method enabled async iterators, and other interesting things.

The difference between my IAsyncEnumerator and the BCL IObserver is that, MoveNext means lazy/cold, you'll be sure that IAsyncEnumerable only give you value when you're "foreaching" it, however, if someone gave you an IObservable, you wouldn't know if it's hot or cold without looking into documentation or asking the person.

Let's say, we need to download images from the URLs given from some source. Maybe regex match results from a lot of files in some folder or even http responses, we don't care, all we know is IObservable<string>, we want some rule of stop continuing to receive new urls, and want this method async Task:
async Task CrawlLinkContents( IObservable<string> urls )
{
    int bad = 0, maxBad = 100;
    var downloads = new List<Task>();
    var disposable = urls.Where(url =>
    {
        var isBad = IsBad(url);
        if ( isBad )
        {
            bad++;
        }
        return !isBad;
    })
    .TakeWhile(url => bad < maxBad)
    .Subscribe(url =>
    {
        var dt = ScheduleAndDownloadAsync(url);
        downloads.Add(dt);
    });

    await Task.WhenAll(downloads);
}
I have to call several combinators with shared variables captured.

If given an IAsyncEnumerable<string>, it could be written like:
async Task CrawlLinkContents( IAsyncEnumerable<string> urls )
{
    int bad = 0, maxBad = 100;
    var downloads = new List<Task>();
    foreach ( async var url in urls )
    {
        if ( IsBad(url) )
        {
            bad++;
            continue;
        }

        if ( bad >= maxBad )
            break;

        var dt = ScheduleAndDownloadAsync(url);
        downloads.Add(dt);
    }

    await Task.WhenAll(downloads);
}
Translating the above async foreach loop is quite easy:
using( var af = urls.GetAsyncEnumerator() )
{
    while ( await af.MoveNext() )
    {
        var url = af.Current;

        if ( IsBad(url) )
        {
            bad++;
            continue;
        }

        if ( bad >= maxBad )
            break;

        var dt = ScheduleAndDownloadAsync(url);
        downloads.Add(dt);
    }
}
Apr 10, 2014 at 2:26 PM
@diryboy You can get quite close to that using TPL Dataflow:
async Task CrawlLinkContents( ISourceBlock<string> urls )
{
    int bad = 0, maxBad = 100;
    var downloads = new List<Task>();
    while (await urls.OutputAvailableAsync())
    {
        var url = await urls.ReceiveAsync();

        if ( IsBad(url) )
        {
            bad++;
            continue;
        }

        if ( bad >= maxBad )
            break;

        var dt = ScheduleAndDownloadAsync(url);
        downloads.Add(dt);
    }

    await Task.WhenAll(downloads);
}
Apr 10, 2014 at 2:36 PM
Halo_Four wrote:
That boilerplate is only limited because Reactive.NET does most of the work already. However, that is not in the BCL, and to write an IObservable<T> manually without Reactive.NET is not insignificant code. It doesn't require the compiler chicanery that writing an iterator does, that's for sure. However, it's my opinion that supporting both with a common syntax is a benefit for the sake of consistency.
I think one of the guidelines most languages follow is: "If something can be done in a library, then it should stay in a library. We don't need to complicate the language with it."

What you said sounds mostly like an argument to include Rx in the BCL to me, not an argument for new syntax.
Apr 10, 2014 at 2:51 PM
I think one of the guidelines most languages follow is: "If something can be done in a library, then it should stay in a library. We don't need to complicate the language with it."
I won't disagree with that. With Rx creating observables in a manner that feels like an iterator isn't that complicated and can be asynchronous. The consumption also isn't difficult but it definitely feels at odds with the way await works. That said, there wasn't really all that much that async/await added to task continuations that wasn't already possible aside to make for cleaner code.
What you said sounds mostly like an argument to include Rx in the BCL to me
I'd be all for that. Then maybe Reactive.NET can see some love with the rest of the framework instead of feeling like it's constantly fighting it. I also think that if Rx was in the BCL when C# 5.0 asynchrony was being designed that we wouldn't be having this conversation.
Apr 11, 2014 at 5:31 AM
svick wrote:
@diryboy You can get quite close to that using TPL Dataflow:
Ah I didn't know that before, thank you for bringing this to me. So my IAsyncEnumerable interfaces looks not necessary, neither does the foreach consuming.
Aug 12, 2014 at 3:06 AM
Hi,

I was just about to request this feature and found this thread. :)

A while ago I wrote a blog post highlighting the benefits of a truly async iterator (IObservable<T>) rather than the commonly proposed yet very different (and incorrect) alternatives; e.g., IAsyncEnumerable<T> and IEnumerable<Task<T>>.

Note that Rx 2.0 does provide a very nice alternative in the form of an API. Here's what the OP's example would look like in Rx 2.0:
public IObservable<string> GetWebStuff(IEnumerable<string> addresses)
{
  return Observable.Create<string>(async (observer, cancel) =>
  {
    var httpClient = new ...;

    foreach (var address in addresses)
    {
      var response = await httpClient.GetAsync(address, cancel);

      observer.OnNext(await response.Content.ReadAsStringAsync(cancel));
    }
  });
}
However, keep in mind that a simplified example such as this doesn't truly illustrate the power of an async iterator, much like when iterator blocks were first introduced into C# and the samples were pretty lame (no offense ;). I think many devs stated that iterator blocks were a relatively useless and confusing feature, including probably myself, yet now I use iterator blocks all the time. I use async/await all of the time now too. Imagine if iterator blocks were asynchronous...

But for completion's sake, here's an alternative way to write this particular query without an async iterator. The benefit of the following query over the former is concurrency; i.e., web requests among the different addresses are sent concurrently, rather than sequentially as in the async iterator above.
var webStuff = 
  from address in addresses.ToObservable()
  select Observable.Using(() => new HttpClient(...), (httpClient, cancel) => 
    from response in httpClient.GetAsync(address, cancel)
    from content in response.Content.ReadAsStringAsync(cancel)
    select content)
   .Merge(/* optional max-concurrency parameter */);
Perhaps there isn't a huge benefit to having async iterators as a C# language feature since it's possible to use Observable.Create in Rx 2.0. But then again, iterator blocks could have been implemented similarly as an API without being a language feature (e.g., Enumerable.Create), yet we've got them in C#.

There's just something really appealing about having a very useful, though fairly obscure, feature being promoted by your favorite language as a first-class citizen. At the very least it will expose many developers to the benefits of reactive programming.

Furthermore, async iterators fill in a conceptual gap:
        Sync Scalar    Async Scalar    Sync Sequence    Async Sequence
        -----------    ------------    -------------    --------------
Type:   T              Task<T>         IEnumerable<T>   IObservable<T>
C#:     return         await           yield return     Not supported (why?)
- Dave
Aug 12, 2014 at 11:58 AM
davedev wrote:
But then again, iterator blocks could have been implemented similarly as an API without being a language feature (e.g., Enumerable.Create), yet we've got them in C#.
I don't think they could have been implement that way. Observable.Create() works, because IObservable is "push", i.e. the consumer code is after the producer code in the call stack. So, observer.OnNext() just calls the consumer code, and everything works like it should.

But IEnumerable is "pull" (consumer code is before the producer code in the call stack). So yield return has to return from the current method and you can't do that with just a method call. I think you could implement Enumerable.Create() using await and custom awaitable, but yield return predates await by quite a few years.
Aug 12, 2014 at 12:11 PM
@svick: Actually, it already exists. https://www.nuget.org/packages/Ix-Main
public static IEnumerable<T> Create<T>(Action<IYielder<T>> create);
Just an API. No magic or voodoo required ;)
Aug 12, 2014 at 12:14 PM
@svick: And keep in mind that iterator blocks are really just a syntactic trick. They are rewritten by the compiler into a state machine. There has never been anything preventing Enumerable.Create from being implemented since C# 1.0.
Aug 12, 2014 at 12:27 PM
@svick: Just noticed that internally IYielder is implemented with Task as you suggested, but that's only to allow C# to generate the state machine itself as a convenience. My point was that the IEnumerator<T> state machine could be implemented manually with only C# 1.0, though of course our code would look a bit strange. :)
Aug 12, 2014 at 2:08 PM
davedev wrote:
@svick: And keep in mind that iterator blocks are really just a syntactic trick. They are rewritten by the compiler into a state machine. There has never been anything preventing Enumerable.Create from being implemented since C# 1.0.
No, you can't implement Enumerable.Create using just a library, exactly because you need the compiler trick to rewrite your code into a state machine; library can't do that.

davedev wrote:
My point was that the IEnumerator<T> state machine could be implemented manually with only C# 1.0, though of course our code would look a bit strange. :)
Yes, but writing that state machine by hand is very tedious and hard, which is the whole reason why yield return exists. On the other hand, using Observable.Create() is not hard and only slightly tedious.
Aug 12, 2014 at 4:02 PM
Edited Aug 12, 2014 at 4:07 PM
@svick: We're off topic now. But I don't think it would be hard at all -- it would be something like this:

(Edit: Created a closure to capture the state.)
var i = 0;
var e = EnumerableEx.Create<string>(yielder => 
{
  switch (yielder.Case)
  {
    case 0:
      return yielder.Yield("Hello");
    case 1:
      return yielder.Yield("World!");
    case 2:
      return yielder.Yield("Counter:");
    default:
      if (i < 10)
      {
        return yielder.Yield(i++);
      }
      return yielder.Break();
  }
});
Anyway, to bring it back on point, I think native async iterators would provide much of the same benefits, though it's true that Observable.Create is not nearly as messy as the above example. So it's not a requirement, but it's nice to have and would simplify things a bit.
Aug 12, 2014 at 4:12 PM
@svick: Although, you're comparing apples to oranges. The reason that Observable.Create isn't as "hard" and is only "slightly tedious" is because it takes advantage of await, just like IEnumerable<T> Create<T>(Action<IYielder<T>> create). If you want to compare apples to apples, then the simple state machine in my previous post is analogous to a hypothetical implementation of Observable.Create that doesn't take advantage of either await or iterator blocks (as the original Rx-Experimental release did).

So we have sync iterators, why not async iterators.
Aug 12, 2014 at 8:02 PM
Speaking of Yield and Iterator another potentially useful capability would be a static yield.
Recurse<T> ( n : Node<T> ) : IEnumerable<T>
{
  if( n is null ) return
  Recurse<T>( n.Left )
  static yield n  
  Recurse<T>( n.Righ )
}
When the iterator funcction Recurse is called externally a new iterator is created, but when the iterator function Recurse is called internally the same iterator is used.
This would make it O(n) rather that O(n^2)

This different to Yield! , which is effectively the same as.
foreach( item in thisIEnumerable)
  yield item
Aug 12, 2014 at 9:05 PM
@davedev I think that your switch-based Enumerable.Create() would be of very limited use. Once you start wanting to have ifs or whiles, you won't know at which position should what code be executed. Yes, you can build the state machine yourself (where you set the next state, not Enumerable.Create()), but if you've ever seen the state machine for even a moderately complicated yield return (or async) method, you know that's not feasible.

And Rx Observable.Create() doesn't use await the same way Ix Enumerable.Create() does: with Ix, you have to use await to be able to yield (e.g. await yielder.Yield(42);); with Rx, you can use await to perform some asynchronous operations, but you don't need it to yield (it's just observer.Next(42);), there are even overloads that don't accept async lambdas:
Observable.Create<int>(
    observer =>
    {
        observer.OnNext(42);

        return Disposable.Empty;
    });

Aug 12, 2014 at 9:19 PM
I'm going to make the argument that since/until Rx is a part of the BCL that it should effectively be treated like it doesn't exist when it comes to syntax candy offered by the language. However, System.IObservable<T> and System.IObserver<T> are BCL and it makes sense to offer syntax to simplify usage of asynchronous sequences, both creating and consuming them.

That said I'll reiterate that I think that incorporating/merging Rx into the BCL makes a huge amount of sense in terms of the charter and evolution of the language given it marries the core concepts of C# 3.0 (LINQ) with the core concepts of C# 5.0 (Async). Combined with the pattern matching proposals I think it would really position C# in a great place with reactive-style programming.
Aug 13, 2014 at 12:16 AM
@svick: Yes, I understand observables and Rx. But you're comparing a poor man's coroutine to call\cc. Again, it's apples to oranges. They both rely on continuations, but their directions are reversed.

You started by claiming that Enumerable.Create is impossible and now you're just saying it would be of very limited use to implement manually. Yield and await are definitely not required, but they are certainly desired. I get your point.

But what is your real point? You don't think that async iterators are a good fit in C# because I'm making it sound too easy to implement state machines for iterator blocks? I'm not making the connection here. What's your vote?

I believe that the conversation started because you didn't like my previous statement:
Perhaps there isn't a huge benefit to having async iterators as a C# language feature since it's possible to use Observable.Create in Rx 2.0. But then again, iterator blocks could have been implemented similarly as an API without being a language feature (e.g., Enumerable.Create), yet we've got them in C#.
I stand by this statement. It's just an analogy with some overlap. I wasn't claiming that they are identical or that observables are as complex as enumerables. But your point is that they are very different and so the analogy is bad. Great, I'll concede your point. It was a bad analogy.
Aug 13, 2014 at 9:56 AM
davedev wrote:
But what is your real point? You don't think that async iterators are a good fit in C# because I'm making it sound too easy to implement state machines for iterator blocks? I'm not making the connection here. What's your vote?
My real point is that I don't consider async iterators to be worth it. yield return is a huge improvement when compared with implementing the state machine manually. And it's also big improvement over any form of Enumerable.Create(), because those will always have some big drawbacks.

When I said that Enumerable.Create() is impossible, I meant, that Enumerable.Create() that would work for IEnumerable as well as the existing Observable.Create() works for IObservable is. And I still stand by that.

But async iterators offer only trivial improvements (i.e. less boilerplate code) over Observable.Create(), which isn't enough to justify their existence for me.
Aug 13, 2014 at 5:10 PM
@svick:
My real point is that I don't consider async iterators to be worth it.
But your arguments against my admittedly unfair analogy don't cover any of my primary arguments for this feature. You didn't even address the paragraphs that followed, which also showed that the point of my analogy was actually that introducing iterator blocks into C# put them into the minds of many developers, who then went on to figure out really good, common uses for them. Most devs rarely even considered that iterator blocks would be even remotely useful in C# and now we use them all the time. That was my point, but I still concede that the analogy was unfair for the simple reason that we now have async coroutines as a first-class citizen (async/await), yet iterator blocks were technically the first coroutine(-like) implementation in C#, so they had nothing to lean on. My analogy ambiguously implied that I wanted async iterators because it would simplify writing them, but that was never my argument... I even presented a code example to the contrary in that same post and stated "Note that Rx 2.0 does provide a very nice alternative in the form of an API". :-)
When I said that Enumerable.Create() is impossible, I meant, that Enumerable.Create() that would work for IEnumerable as well as the existing Observable.Create() works for IObservable is. And I still stand by that.
But you're still comparing a coroutine (Enumerable.Create) to call\cc (Observable.Create(Func<IObserver,IDisposable>)). The Func passed to the latter is not a coroutine!

This topic is about async iterators. The enumerable method is a synchronous iterator, but the observable method that you cited is NOT an async iterator.

To be fair, you must compare the scenario in which you implement Enumerable.Create without yield return or async/await, to the scenario in which you implement Observable.Create(Func<IObserver,Task>) without yield return or async/await. In other words, it's the coroutine that provides the state machine and it's the state machine that makes things complicated.

As I've shown, it's theoretically possible to implement a coroutine as an API, without yield return or async/await, and it likely could have been accomplished in C# 1.0 (or 2.0 if you want generics). Granted, it's pretty clean for simple cases but as complexity of the requirements grows so does the implementation of the state machine. (Still entirely unrelated to the point of my original analogy though ;-)

What I've conceded to is simply that the difficulty of implementing synchronous iterators using Enumerable.Create without async or yield is greater than the difficulty in implementing async iterators without the proposed language support, since we already have async and yield. (And in fact, Rx-Experimental used yield before C# 5 was released.) The analogy was ambiguous and I'm sorry for including it.

I hope that clears up any remaining confusion.
Aug 14, 2014 at 12:37 AM
davedev wrote:
@svick:
My real point is that I don't consider async iterators to be worth it.
But your arguments against my admittedly unfair analogy don't cover any of my primary arguments for this feature. You didn't even address the paragraphs that followed, which also showed that the point of my analogy was actually that introducing iterator blocks into C# put them into the minds of many developers, who then went on to figure out really good, common uses for them. Most devs rarely even considered that iterator blocks would be even remotely useful in C# and now we use them all the time. That was my point,
Do you mean this?

davedev wrote:
At the very least it will expose many developers to the benefits of reactive programming.
I really don't think that we should introduce new features into a language that are basically useless by themselves just to promote something.
When I said that Enumerable.Create() is impossible, I meant, that Enumerable.Create() that would work for IEnumerable as well as the existing Observable.Create() works for IObservable is. And I still stand by that.
But you're still comparing a coroutine (Enumerable.Create) to call\cc (Observable.Create(Func<IObserver,IDisposable>)). The Func passed to the latter is not a coroutine!
Well, yes, I'm comparing a library method used to implement IEnumerable to a method used to implement IObservable. Those are fundamentally different interfaces, so the methods also have to be different. What else should I be comparing?
This topic is about async iterators. The enumerable method is a synchronous iterator, but the observable method that you cited is NOT an async iterator.

To be fair, you must compare the scenario in which you implement Enumerable.Create without yield return or async/await, to the scenario in which you implement Observable.Create(Func<IObserver,Task>) without yield return or async/await. In other words, it's the coroutine that provides the state machine and it's the state machine that makes things complicated.
I don't think it makes sense to try to be "fair" to features. Before yield return, there wasn't yield return or await. In that environment, the feature made sense, so it was added. (Maybe it wouldn't make sense now, but it did then.) But now, we do have yield return and await, so you have to consider any new additions in terms of that. And new syntax for async iterators doesn't make sense in this environment to me.
As I've shown, it's theoretically possible to implement a coroutine as an API, without yield return or async/await, and it likely could have been accomplished in C# 1.0 (or 2.0 if you want generics). Granted, it's pretty clean for simple cases but as complexity of the requirements grows so does the implementation of the state machine. (Still entirely unrelated to the point of my original analogy though ;-)
I still don't think you've shown that. Your version of Enumerable.Create() is almost useless for anything even moderately complex.
Aug 14, 2014 at 5:52 AM
I really don't think that we should introduce new features into a language that are basically useless by themselves just to promote something.
Agreed, if async iterator blocks were "basically useless". But they're just not. Their introduction would actually have a lot in common with the introduction of async/await. Async programming was obviously possible before C# 5.0, but the async/await keywords are great because they helped us to avoid CPS and made async programming accessible to devs that find functional programming confusing (specifically, higher-order functions). Async iterator blocks would do exactly the same thing, but for sequences rather than scalar values. (See the chart at the end of my original post.) True, it's to a lesser degree because the body of an async iterator actually uses async/await, but it still avoids the initial CPS required by the current API, it avoids having to specify an explicit type argument (since it cannot be inferred, which makes the Create APIs particularly strange) and it would make reactive programming accessible to devs in a higher degree than async/await.

It's also not "basically useless" any more than several of the other proposed features in C# vNext. None of the recent proposals are "required" and many seem to offer much less significant functionality than async iterator blocks would. A few of them have already been implemented as APIs as well, so I'm just not buying your argument that async iterator blocks are "basically useless" simply because it can be implemented as a non-FCL API, which I might add is confusing to use for many devs and certainly isn't ideal.
What else should I be comparing?
I already answered that in my following paragraph. And you've already responded to it in your following paragraph., which I'll address next...
I don't think it makes sense to try to be "fair" to features. [snip]
It's not about features at all. You're simply comparing miles to kilometers without converting them to the same units of measurement first.

For context, here is your original reply in its entirety:
But then again, iterator blocks could have been implemented similarly as an API without being a language feature (e.g., Enumerable.Create), yet we've got them in C#.
I don't think they could have been implement that way. Observable.Create() works, because IObservable is "push", i.e. the consumer code is after the producer code in the call stack. So, observer.OnNext() just calls the consumer code, and everything works like it should.
But IEnumerable is "pull" (consumer code is before the producer code in the call stack). So yield return has to return from the current method and you can't do that with just a method call. I think you could implement Enumerable.Create() using await and custom awaitable, but yield return predates await by quite a few years.
Paraphrasing your original argument:
  1. Iterator blocks are impossible to implement without yield return or async/await.
  2. Observables are easily implemented without yield return or async/await.
  3. Therefore, iterator blocks were included out of necessity while asynchronous iterator blocks are unnecessary.
    Your conclusion is invalid because "asynchronous iterator blocks are unnecessary" cannot be deduced from "Observables are easily implemented".
In other words:
  1. Premise: First you referred to synchronous iterator blocks.
  2. Premise: Next you referred to an overload of Observable.Create that generates an asynchronous sequence; however, that overload is NOT an iterator! User code has no ability to await without resorting to CPS. Therefore, a native C# implementation would not be an "Asynchronous iterator block". It would be call\cc.
Nobody here was arguing for a native call\cc implementation in C#. We want a native coroutine for asynchronous sequences; i.e., async iterator blocks.

Corrected:
  1. Premise: Synchronous iterator blocks.
  2. Premise: Asynchronous iterator blocks. E.g., Observable.Create(Func<IObserver, Task>)
  3. Conclusion: Synchronous iterator blocks are necessary without async/await. Asynchronous iterator blocks are unnecessary with async/await.
At this point, the conclusions may be entirely valid; however, they are being deduced from each of the premises individually. No comparisons were made, yet...

But then you went on to compare the difficulties implementing the hypothetical Enumerable.Create with that of the existing Observable.Create(Func<IObserver, IDisposable>), though it wasn't a fair comparison.

As I was saying, to be fair, if you make a claim about the difficulties in implementing synchronous iterator blocks without async/await, and you compare it to the difficulty of implementing an asynchronous iterator block without async/await (e.g., you stated "So, observer.OnNext() just calls the consumer code, and everything works like it should." and your code example did not use await), then you must compare the overload of Observable.Create that actually implements an asynchronous iterator, not the overload that doesn't!

What I mean is that of course there's no difficulty in implementing Observable.Create(Func<IObserver,IDisposable>) without async/await. It's not even a coroutine to begin with! You're comparing apples to oranges. Comparing in terms of the difficulty without async/await, you must consider implementing Observable.Create(Func<IObserver,Task>) without async/await to be fair!

The conclusion of a fair comparison is that it's likely equally difficult to implement Enumerable.Create without async/await as it is to implement Observable.Create(Func<IObserver,Task>) without async/await (notice that this overload returns Task), yet both are definitely possible with some changes to their signatures, as I've already shown before for the sync world.

We were definitely off-topic by this point anyway. :-)

I've already twice conceded to your higher point that since we do have async/await, having native asynchronous iterator blocks would not make implementing async iterators substantially easier in terms of the coroutine since Observable.Create(Func<IObserver,Task>) already does a fine job of that.

But there's no sense in beating a dead horse. I'd prefer if you would address my actual points instead.
Nov 9, 2014 at 3:23 AM
Edited Nov 9, 2014 at 4:09 AM
Hmm. Is there reason why Observable.Where, .Select, etc methods in Rx don't have async overloads now? Will it be added later? Would be nice have
var xA = Enumerable.Range(0, 10);
var xObs = xA.ToObservable();
var xFiltered = await xObs
    .Where(x => await Task.FromResult(x != 5))
    .Select(x => await Task.FromResult((float) x))
    .ToList();
I understand why such things can't be used with IEnumerable, but await functions in rx linq queries are very welcome. Guaranteed order is not necessary (probably should pass as additional parameter).
Nov 9, 2014 at 9:51 AM
SelectMany has overloads that support tasks. This is what you should use instead of Select if you want to project into a Task and have the remainder of the computation await the result, assuming though that you want overlapping notifications; e.g., so-called "merge" or "flatten" semantics. SelectMany is the "sequential asynchronous composition" operator in Rx. It doesn't matter whether it's composing an IObservable<T> (cardinality = 0-infinity) or Task<T> (cardinality = 0-1), its behavior is essentially the same.

You can also do an async Where by simply composing SelectMany with a normal Where.

For example:
from x in xs
from y in FooAsync(x)
from z in PredicateAsync(y)
where z
select y;
Nov 9, 2014 at 2:01 PM
Edited Nov 9, 2014 at 2:05 PM
Yes, thanks, a little tricky syntax but works exactly as needed.