Featured image

Series: JavaScript Iterators and Generators

Introduction Link to heading

Here we are! It’s time for Asynchronous Generators! Are you excited? I am excited 😃.

This kinda exotic type of function was added to the language at the same time as async iterators were, and it answers the question: how async functions and generators should compose? If you think about it, async generators are still generators, so there shouldn’t be a lot of both conceptual and concrete (in their usage) differences with the sync counterpart. But async generators are async functions too, so the await keywords can be used inside them. That’s why the previous question can be reworded as follows: how can the yield keyword work in conjunction with the await keyword?  

Asynchronous Generators Link to heading

The answer involves state machines and queues, plus a clever algorithm that is damn well explained here. Trying to summarize and simplify, each time then next/throw/return method is called, a Promise is immediately returned and the call itself is enqueued. The async generator instance can be in two states: PAUSED or RUNNING. The PAUSED state means that the instance was waiting to start or was paused on a yield. In such a case, the call just enqueued will resume the generator and the next yielded value, or the next thrown error, will be used to fulfil, or reject, the Promise previously returned from that call. But the generator could be in the RUNNING state, and this means that it is awaiting some async operation. Therefore it is still handling a previous iteration’s method call and the new one mustn’t have any side effect on the generator instance’s state. It should just be queued. The circle is closed because each time a yield keyword is encountered, if there is at least one enqueued call, the generator won’t PAUSE. The oldest enqueued call will be dequeued to be handled and the generator will continue with its flow.

Although it should be clear enough at this point, I want to stress that objects returned by async generators implement the AsyncIterator interface we saw in the last article. Therefore they are perfect to create async iterables. Moreover, the implicit iteration’s methods calls queue contrasting a possible high consumer pressure, the fact that all async iterators returned are async iterables as well (remember the return this convention) and, last but not least, the impossibility of returning a Promise as an iteration result’s value, following the spec hints, are further excellent reasons for preferring async generators over a manual implementation of the async iteration interfaces.  

Our first async generator Link to heading

Let’s revisit the remotePostsAsyncIteratorsFactory we built in the previous article. The code is self-explanatory enough:

async function* remotePostsAsyncGenerator() {
    let i = 1;
    
    while(true) {

        const res = await fetch(`https://jsonplaceholder.typicode.com/posts/${i++}`)
                                .then(r => r.json());
        
        // when no more remote posts will be available,
        // it will break the infinite loop.
        // the async iteration will end
        if (Object.keys(res).length === 0)  { break; }

        yield res;
    }
}

The act of yielding something we do not yet have could sound weird to you, but remember that a temporally ordered set of Promises is involved, which will be fulfilled only when data become available. Surely you will have noticed the qualitative leap respect to the previous manual implementation of the async iteration interfaces. That’s because although async generators seem scary and intimidating at first, they help us to write some very clear, compact and effective code, despite the considerable level of abstraction.  

yield-on, await-off Link to heading

Let’s become more confident with the behaviour of these two keywords.

As we did before, you can wait for the completion of a Promise, to then yield its fulfilment value:

const asyncSource = {
    async *[Symbol.asyncIterator]() {
        const v = await new Promise(res => setTimeout(res, 1000, 1));
        yield v;
    }
}

You can do the same with only one line, like a true __pro__grammer:

const asyncSource = {
    async *[Symbol.asyncIterator]() {
        yield await new Promise(res => setTimeout(res, 1000, 1));
    }
}

In this case, you can avoid using the await keyword at all! That’s because, when it comes to async iteration, an IteratorResult’s value should never be a Promise. Therefore, async generators won’t let the Promise pass. Each yielded Promise will be implicitly awaited:

const asyncSource = {
    async *[Symbol.asyncIterator]() {
        yield new Promise(res => setTimeout(res, 1000, 1));
    }
}

 

Otherwise, if you plan to insert a Promise into the generator, using the next method, you will have to manually await it:

const asyncSource = {
    async *[Symbol.asyncIterator]() {
        // the good, old operators precedence...
        const nextArgument = await (yield ...);
    }
}

 

The next script is a good test to know if you have understood how await and yield work together:

const asyncSource = {
    async *[Symbol.asyncIterator]() {
            console.log(await new Promise(res => setTimeout(res, 1000, 1)));
            console.log(await new Promise(res => setTimeout(res, 2000, 2)));
            yield 42;
    }
}

const ait = asyncSource[Symbol.asyncIterator]();

ait.next().then(({value}) => console.log(value));

What will it log?

// 1
// 2
// 42

You could be tempted to think that, since the 42 is already known, it we’ll be almost immediately returned. But this is not how async generators works: a Promise is instantly returned by the next method, but its resolution is deferred until the first yield is encountered.  

Every other consideration we made about sync generators, like the behaviour of the try-catch block, generators delegation and so on, also apply to the async variant. Therefore, I chose to follow a DRY approach. Just keep in mind that async generators work in the async realm, so fulfilled or rejected Promises are used instead of spatial values and exceptions. Be aware that the yield* keyword does support both async and sync iterables. It uses a conversion process similar to the one adopted by the for-await-of.  

A more complex real-life example Link to heading

The following is a possible approach for the case where a remote resource has a very strict limit on the number of requests, so we have to fetch more data with one request but we want to handle them in smaller chunks:

// do you remember it?
function* chunkify(array, n) {
    yield array.slice(0, n);
    array.length > n && (yield* chunkify(array.slice(n), n));
}

async function* getRemoteData() {
    let hasMore = true;
    let page;

    while (hasMore) {
        const { next_page, results } =
            await fetch(URL, { params: { page } }).then(r => r.json());

        // return 5 elements with each iteration
        yield* chunkify(results, 5);

        hasMore = next_page != null;
        page = next_page;
    }
}

At each iteration, getRemoteData will return one chunk of the previously fetched array, thanks to the delegation to chunkify. Sometimes the generator will fetch another bunch of data from the remote source, sometimes it will not. There will be no differences from a consumer’s point of view.

It’s noteworthy that each IteratorResult is strictly dependent on the previous ones. Each iteration could be the last: local to an array of results or global. Furthermore, to get the next bunch of data, we need the previous next_page. It would be impossible to maintain consistency without the queuing logic of the async generators.  

Generators Piping Link to heading

There is a powerful pattern, conceptually similar to the Unix piping, based on the fact that each generator produces an iterator-iterable that can be iterated.

The overall idea is to combine multiple generators sequentially, like a pipe combine multiple processes, feeding each generator with the values yielded by the one which precedes it. Because each generator is able to do whatever it wants with those values, many pretty interesting things can be done, both sync and async 😃. From simply logging them for debugging purposes, to completely transforming them. From yielding out only those that satisfy a given condition, to yield a composition of them. Two compound generators are not required to yield at the same frequency. Therefore the outer is able to iterate over the inner more than once before yielding something. But it is equally capable of producing more than one value using only one chunk of data received from its source.  

Synchronous piping Link to heading

To better understand the idea, let’s start doing something easy and sync. Let’s say we have a collection of numbers and we want to multiply each one by 2, to then subtract 1 and, finally, remove all non-multiples of 3:

function* multiplyByTwo(source) {
    for(const value of source) {
        yield value * 2;
    }
}

function* minusOne(source) {
    for(const value of source) {
        yield value - 1;
    }
}

function* isMultipleOfThree(source) {
    for(const value of source) {
        !(value%3) && (yield value);
    }
}

const collection = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];

// here the piping
[...isMultipleOfThree(minusOne(multiplyByTwo(collection)))]; // [3, 9, 15]

There are some considerations to make. The first concerns the high reusability, maintainability and extensibility of this approach. Each generator expects an iterable, that is an interface, so it knows nothing about its source. Furthermore, it produces another iterable, so it doesn’t need to know anything about its consumer. You can compose these functions as you want, reusing them whenever you need. The second is about memory efficiency: unlike arrays' methods such as map, flatMap and filter, there isn’t a collection that is recreated from scratch at each step of the pipeline. Each value flows from the source to the end, or to the generator that will discard it. This is a remarkable resemblance to transducers.  

Asynchronous piping Link to heading

This is also applicable to the async realm thanks to async generators. Let’s consider a fairly widespread example, that is reading a file using Node.js streams, which are async iterables:

// yaffee
(async function IIAFE() {
    const fileStream = new FileReaderByLines('test.txt');

    for await (const line of fileStream) {
        console.log('> ' + line);
    }
})();

We are creating an instance of the FileReaderByLines class, that we will examine shortly, to then asynchronously iterating over it thanks to the for-await-of loop.

Here it is the FileReaderByLines class:

const fs = require('fs');

// transform chunks into lines
async function* chunksToLines(chunksSource) {
    
    let previous = '';

    for await (const chunk of chunksSource) {
        previous += chunk;
        let eolIndex;

        while ((eolIndex = previous.indexOf('\n')) >= 0) {
            const line = previous.slice(0, eolIndex + 1);

            yield line;
            
            previous = previous.slice(eolIndex + 1);
        }
    }

    if (previous.length > 0) {
        yield previous;
    }
}

class FileReaderByLines {
    constructor(file) {
        this.file = file;
    }
        
    [Symbol.asyncIterator]() {
        const readStream = fs.createReadStream(this.file, {
                encoding: 'utf8',
                highWaterMark: 1024
        });

        // PIPING
        // readStream produces an async iterable over chunks of the file
        // we feed 'chunksToLines' with it
        // 'chunksToLines' produces an async iterable that we return
        return chunksToLines(readStream); 
    }          
}

Let me explain what is happening.

The @@asyncIterator method takes care of handling the data source, the file, using a readable stream. The call to fs.createReadStream method creates the stream by setting the file to read, the encoding used by the file and the maximum size of an internal buffer. The dimension of each chunk of data returned by the stream will be at most 1024 bytes. The stream, when iterated, will return the content of the asynchronously filled buffer when it becomes available.

If the following for-await-of is used, the outcome would be the printing of the whole file, 1024 bytes at a time:

for await (const buffer of readStream) {
    console.log(buffer);
}

Instead, we are using the piping pattern to transform one, or more if necessary, 1024 bytes long chunk into lines. We compose the async iterable returned by fs.createReadStream with chunksToLines, an async generator that takes an async iterable and returns another one. The latter, at each async iteration, will return a line, not a chunk. It’s noteworthy that most of the time the two async iterables produce values at a different frequency.

At the IIAFE level we could choose, for some weird reason, to capitalize each line. We can easily do that by inserting another async generator into the pipeline:

async function* capitalize(asyncSource) {
    for await (const string of asyncSource) {
        yield string[0].toUpperCase() + string.slice(1);
    }   
}

// yaffee
(async function IIAFE() {
    const fileStream = new FileReaderByLines('test.txt');

    for await (const line of capitalize(fileStream)) { // <-- here the PIPING
        console.log('> ' + line);
    }
})();

In this way, we are composing the async iterable fileStream with capitalize, another async generator that takes an async iterable and returns a fresh new one. The latter, at each async iteration, will return capitalized lines.

 

Conclusion Link to heading

That’s all folks!

We have learnt why we need Asynchronous Generators, how they work and how they can be used to build amazing things! Thanks to them, creating well-formed async iterables is a breeze. We have also learnt about both sync and async generators piping, a powerful pattern that I’m sure will expand your horizons.

If you’ve enjoyed this journey through JavaScript Iterators and Generators, please help me share it to as many JS developers as possible 💪🏻😃. And, as usual, I hope to see you again 🙂 and on twitter!

 

Acknowledgements Link to heading

I would like to thank Nicolò Ribaudo and Marco Iamonte for the time spent helping me to improve the quality of the article.

 

Bibliography Link to heading