Concurrent Data Processing in Elixir by Svilen Gospodinov

Concurrent Data Processing in Elixir by Svilen Gospodinov

Author:Svilen Gospodinov
Language: eng
Format: epub
Tags: Pragmatic Bookshelf
Publisher: Pragmatic Bookshelf


They will be called when you invoke GenStage.call/3, GenStage.cast/2, or Process.send/3, respectively. However, the return signatures of those callbacks have an important difference to their GenServer counterparts. Here are two examples of return tuples allowed for GenStage:

​ {​:reply​, reply, [event], new_state}

​ {​:noreply​, [event], new_state}

Notice the extra element in the tuple that holds a list of event values. These callbacks work exactly the same as the handle_demand/3 callback. This is great news, because it gives us a lot of flexibility when dispatching events.

Let’s implement our API in PageProducer:

scraper/lib/page_producer.change1.ex

​ ​def​ scrape_pages(pages) ​when​ is_list(pages) ​do​

​ GenStage.cast(__MODULE__, {​:pages​, pages})

​ ​end​

​

​ ​def​ handle_cast({​:pages​, pages}, state) ​do​

​ {​:noreply​, pages, state}

​ ​end​

We have exposed a function scrape_pages/1 which accepts a list of URLs. This function will be our user-facing API. Inside the function, we call GenStage.cast/2, just like we did with GenServer before.

In the handle_cast/2 callback function, we return a tuple as a result. The first element in the tuple is always the type of reply, which is :noreply in this case. The second element must be a list, containing the events we want to dispatch. We are going to return pages, which contains our list of strings. Finally, the third element is the process state, which you can update if you need to. Here, we are just returning it unchanged. As you can see, the return tuple format is very similar to the one for GenServer, but with the addition of the events element.

Let’s run our application again using the IEx shell:

​ ​$ ​​iex​​ ​​-S​​ ​​mix​

You should see an output similar to this:

​ Erlang/OTP 21 [erts-10.0.3] [source] [64-bit] [smp:4:4] [ds:4:4:10]

​ [async-threads:1] [hipe] [dtrace]

​

​ Compiling 1 file (.ex)

​

​ 16:10:30.437 [info] PageProducer init

​ 16:10:30.443 [info] PageConsumer init

​ 16:10:30.443 [info] PageProducer received demand for 3 pages

​

​ Interactive Elixir (1.8.1) - press Ctrl+C to exit (type h() ENTER for help)

​

​ ​iex(1)>​

As expected, the consumer sends demand as soon as it is initialized. Since our handle_demand/2 callback does not return events, this initial demand is not satisfied and therefore the consumer will wait until events are available.

Now, we’re going to create a list of URLs and call our API:

​ ​iex(1)>​ pages = [

​ ​...(1)>​ ​"​​google.com"​,

​ ​...(1)>​ ​"​​facebook.com"​,

​ ​...(1)>​ ​"​​apple.com"​,

​ ​...(1)>​ ​"​​netflix.com"​,

​ ​...(1)>​ ​"​​amazon.com"​

​ ​...(1)>​ ]

​ ["google.com", "facebook.com", "apple.com", "netflix.com", "amazon.com"]

​

​ ​iex(2)>​ PageProducer.scrape_pages(pages)

Let’s look closely at the output log:

​ 16:19:51.733 [info] PageConsumer received ["google.com",

​ "facebook.com", "apple.com"]

​ 16:20:02.742 [info] PageProducer received demand for 1 pages

​ 16:20:02.743 [info] PageConsumer received ["netflix.com", "amazon.com"]

We can see that PageConsumer immediately received the first three pages, which took a bit of time to process, judging by the timestamps. Since only two pages were available next, our consumer realized that it has capacity for one more page, so it immediately issued demand for another page, while starting work on the other two. That’s great, everything is working as expected.

Congratulations, you just created your first data-processing pipeline with GenStage! We have created a producer and a consumer, put them to work together and introduced a lot of new concepts on the way.

However, we are not finished with our scraper project just yet.



Download



Copyright Disclaimer:
This site does not store any files on its server. We only index and link to content provided by other sites. Please contact the content providers to delete copyright contents if any and email us, we'll remove relevant links or contents immediately.