Concurrent Data Processing in Elixir by Svilen Gospodinov
Author:Svilen Gospodinov
Language: eng
Format: epub
Tags: Pragmatic Bookshelf
Publisher: Pragmatic Bookshelf
They will be called when you invoke GenStage.call/3, GenStage.cast/2, or Process.send/3, respectively. However, the return signatures of those callbacks have an important difference to their GenServer counterparts. Here are two examples of return tuples allowed for GenStage:
â {â:replyâ, reply, [event], new_state}
â {â:noreplyâ, [event], new_state}
Notice the extra element in the tuple that holds a list of event values. These callbacks work exactly the same as the handle_demand/3 callback. This is great news, because it gives us a lot of flexibility when dispatching events.
Letâs implement our API in PageProducer:
scraper/lib/page_producer.change1.ex
â âdefâ scrape_pages(pages) âwhenâ is_list(pages) âdoâ
â GenStage.cast(__MODULE__, {â:pagesâ, pages})
â âendâ
â
â âdefâ handle_cast({â:pagesâ, pages}, state) âdoâ
â {â:noreplyâ, pages, state}
â âendâ
We have exposed a function scrape_pages/1 which accepts a list of URLs. This function will be our user-facing API. Inside the function, we call GenStage.cast/2, just like we did with GenServer before.
In the handle_cast/2 callback function, we return a tuple as a result. The first element in the tuple is always the type of reply, which is :noreply in this case. The second element must be a list, containing the events we want to dispatch. We are going to return pages, which contains our list of strings. Finally, the third element is the process state, which you can update if you need to. Here, we are just returning it unchanged. As you can see, the return tuple format is very similar to the one for GenServer, but with the addition of the events element.
Letâs run our application again using the IEx shell:
â â$ ââiexââ ââ-Sââ ââmixâ
You should see an output similar to this:
â Erlang/OTP 21 [erts-10.0.3] [source] [64-bit] [smp:4:4] [ds:4:4:10]
â [async-threads:1] [hipe] [dtrace]
â
â Compiling 1 file (.ex)
â
â 16:10:30.437 [info] PageProducer init
â 16:10:30.443 [info] PageConsumer init
â 16:10:30.443 [info] PageProducer received demand for 3 pages
â
â Interactive Elixir (1.8.1) - press Ctrl+C to exit (type h() ENTER for help)
â
â âiex(1)>â
As expected, the consumer sends demand as soon as it is initialized. Since our handle_demand/2 callback does not return events, this initial demand is not satisfied and therefore the consumer will wait until events are available.
Now, weâre going to create a list of URLs and call our API:
â âiex(1)>â pages = [
â â...(1)>â â"ââgoogle.com"â,
â â...(1)>â â"ââfacebook.com"â,
â â...(1)>â â"ââapple.com"â,
â â...(1)>â â"âânetflix.com"â,
â â...(1)>â â"ââamazon.com"â
â â...(1)>â ]
â ["google.com", "facebook.com", "apple.com", "netflix.com", "amazon.com"]
â
â âiex(2)>â PageProducer.scrape_pages(pages)
Letâs look closely at the output log:
â 16:19:51.733 [info] PageConsumer received ["google.com",
â "facebook.com", "apple.com"]
â 16:20:02.742 [info] PageProducer received demand for 1 pages
â 16:20:02.743 [info] PageConsumer received ["netflix.com", "amazon.com"]
We can see that PageConsumer immediately received the first three pages, which took a bit of time to process, judging by the timestamps. Since only two pages were available next, our consumer realized that it has capacity for one more page, so it immediately issued demand for another page, while starting work on the other two. Thatâs great, everything is working as expected.
Congratulations, you just created your first data-processing pipeline with GenStage! We have created a producer and a consumer, put them to work together and introduced a lot of new concepts on the way.
However, we are not finished with our scraper project just yet.
Download
This site does not store any files on its server. We only index and link to content provided by other sites. Please contact the content providers to delete copyright contents if any and email us, we'll remove relevant links or contents immediately.
Algorithms of the Intelligent Web by Haralambos Marmanis;Dmitry Babenko(7852)
Learning SQL by Alan Beaulieu(5412)
Weapons of Math Destruction by Cathy O'Neil(5038)
Big Data Analysis with Python by Ivan Marin(3016)
Blockchain Basics by Daniel Drescher(2891)
Hands-On Machine Learning for Algorithmic Trading by Stefan Jansen(2519)
Pandas Cookbook by Theodore Petrou(2502)
Building Statistical Models in Python by Huy Hoang Nguyen & Paul N Adams & Stuart J Miller(2489)
Mastering Python for Finance by Unknown(2477)
Azure Data and AI Architect Handbook by Olivier Mertens & Breght Van Baelen(2461)
Serverless Machine Learning with Amazon Redshift ML by Debu Panda & Phil Bates & Bhanu Pittampally & Sumeet Joshi(2395)
How The Mind Works by Steven Pinker(2214)
Data Wrangling on AWS by Navnit Shukla | Sankar M | Sam Palani(2177)
Data Engineering with dbt by Roberto Zagni(2063)
Building Machine Learning Systems with Python by Richert Willi Coelho Luis Pedro(2059)
Driving Data Quality with Data Contracts by Andrew Jones(2041)
Network Science with Python and NetworkX Quick Start Guide by Edward L. Platt(1975)
Python Natural Language Processing by Jalaj Thanaki(1892)
Machine Learning Model Serving Patterns and Best Practices by Md Johirul Islam(1841)