Building a scalable focused web crawler with Flink


1. Flink Web Crawler Ken Krugler | President, Scale Unlimited

2.whoami? • Ken Krugler, Scale Unlimited • Krugle code search startup in 2006 • Lots of big data / web crawl / search projects • Started working with Flink in 2016

3.What I’m Really Covering • Some advanced features of Flink • AsyncFunctions • Iterations • Abusing state as a DB

4.The Cover Story • Can I use Flink to build a web crawler? • Scalable - One would hope so • Focused - only fetch the “good stuff” • Efficient - Always Be Crawling • Simple - no other infrastructure

5.Web Crawling 101

6.Conceptually Simple Seed URLs URL DB

7.Conceptually Simple Seed Robots URLs URL DB Checker

8.Conceptually Simple Seed Robots URLs URL DB Checker Page Fetcher

9.Conceptually Simple Seed Robots URLs URL DB Checker Page Content Parser Fetcher

10.It Seemed Too Easy

11.It Just Worked • But then we needed better performance • Throughput was really stinky • Like a few pages per second, yikes • That’s not the beautiful scaling I want

12.Massive Parallelism • Functions make HTTP GET calls Robots • Most time spent waiting for response Checker • We need 1000+ active requests/server • Please don’t do this… • fetchStream.setParallelism(1000); Page Fetcher


14.AsyncFunction DataStream<Result> postRobotsUrls = AsyncDataStream.unorderedWait( preRobotsUrls, new CheckUrlWithRobotsFunction(), 20, TimeUnit.SECONDS); public class CheckUrlWithRobotsFunction extends RichAsyncFunction<FetchUrl, Result> { @Override public void asyncInvoke(FetchUrl url, ResultFuture<Result> futureResult) { …some async stuff happens, and then… futureResult.complete(result); } }

15.Timeout Failures • A timeout kills the workflow • So make sure client times out • Maybe use Flink timeout as a safety net Caused by: java.util.concurrent.TimeoutException: Async function call has timed out. at AsyncWaitOperator$1.onProcessingTime( at SystemProcessingTimeService$ at Executors$ at at ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(

16.What backpressure? • Async I/O thread handles complete() call • Which means nobody is calling collect() • But that’s what is monitored for back pressure • So UI shows no backpressure, ever

17.No Side Outputs • No side outputs for AsyncFunctions • But we have multiple outputs Robots Checker Status Page Sitemap Update Fetcher Fetcher

18.Tuple3<x, y, z> Output public class CheckUrlWithRobotsFunction extends RichAsyncFunction<FetchUrl, Tuple3<CrawlStateUrl, FetchUrl, FetchUrl>> { • Emit “combo” record • Only one field will be non-null Robots Checker • Split into three streams later Status Sitemap Select Update Fetcher Page Fetcher

19.Chaining backpressure • Flink “chains” operators • This happens in Async I/O thread • Often later operators need higher parallelism Page Content Parser Fetcher

20.That’s not a Crawler!

21.Status Updates Seed Robots URLs URL DB Checker Status Update Page Content Parser Fetcher

22.And Outlinks Seed Robots URLs URL DB Checker Status Links Update Page Content Parser Fetcher

23.Iterations to the Rescue

24.Iterations IterativeStream<CrawlStateUrl> urlDbIteration = seedUrls.iterate(maxWaitTime); … urlDbIteration.closeWith(robotBlockedUrls); IterationSource • Timeout is optional UrlDBFunction • Creates a virtual source & sink Functions… • Memory-based queue in between IterationSink

25.Fun with Fan-out • Every page can have 100s of links • So you get “fan-out” • Which fills up network buffers • Causing gridlock • And the entire workflow locks up

26.Throttling • We track “in-flight” URLs in URL DB function • Don’t emit URL to fetch if “too busy” • Not a great solution URL DB (1/2) Parser (1/2) • Assumes no data skew • In-flight limit is rough approx. Links • Network buffer size URL DB Parser (2/2) • Fan-out record size (2/2)

27.Checkpointing • Only works when sources are emitting • We have to emit “tickler” tuples Seed/tickler Robots URLs URL DB Checker Status Links Update Page Content Parser Fetcher

28.Dropping In-flight Data • Can’t checkpoint “in flight” data • FLIP-15/16 will address this • But we don’t really care URL DB Robots Checker Status Page Update Fetcher

29.When are we done? • Normally when source is empty • So that’s how you cancel a job • Iteration timeout is fragile • We emit special “termination” tuple • Shuts down rest of workflow • Work in progress