Wired Geometric Routing - University of Oxford

Wired Geometric Routing - University of Oxford

Trends in Scalable Stream Processing: Parallelism & Programmability Peter Pietzuch [email protected] Large-Scale Distributed Systems Group Department of Computing, Imperial College London Peter R. Pietzuch http://lsds.doc.ic.ac.uk [email protected] ATI Oxford Meeting 2016 - London How to Make Sense of Big Data OnThe-Fly? More data than ever is created: 2.5 Exabytes (billion GBs) generate each day in 2015 stored: hard drive cost per GB dropped from $8.93 (2000) to $0.03 (2014) Many new sources of data become available sensors, mobile devices, cameras web feeds, social networking databases scientific instruments

But data value decreases over time Peter Pietzuch - Imperial College London 2 Real-Time Analysis of Sensed Data Supporting intelligent transportation services Many interested parties Road authorities, traffic planners, emergency services, commuters Analytics queries Intelligent route planning: What is the best time/route for my commute through central London between 7am-8am? Emergency response Informing urban planning Peter Pietzuch - Imperial College London 3 Machine Learning for Web Analytics Problem: Provide up-to-date predictions regarding which ads to serve

Pre-process Share state Parallelize update y E {1,1} f1 predict Peter Pietzuch - Imperial College London Aggregate Iterate fn Solution: AdPredictor Bayesian online learning algorithm ranks adverts according to click probabilities 4 Real-Time Social Data Mining Social Cascade Detection

Peter Pietzuch - Imperial College London 5 Throughput and Latency Matter High-throughput streams Low-latency results Facebook Insights: Aggregates 9 GB/s < 10 sec latency Google Zeitgeist: 40K user queries/s (1-sec windows) < 1 ms latency Feedzai: 40K credit card transactions/s < 25 ms latency NovaSparks: 150M trade options/s < 1 ms latency Peter Pietzuch - Imperial College London 6 Design Space for Big Data Systems Volume and Velocity

Data amount EBs Algorithmic complexity PBs Hard for all algoHard for rithms complex algorithms TBs GBs Arbitrary data transformation Iterative algorithms Large state as part of computation Easy days hours mins secs

millisecs Latency Peter Pietzuch - Imperial College London 7 Database Centric Processing Doesnt Work Traditional Database Management System (DBMS) Data relatively static but queries dynamic Querie s DBMS Inde x Results Potentially high query latency Updates limited by disk I/O Data Peter Pietzuch - Imperial College London Indices of limited use for fast changing data 8

Stream Processing to the Rescue! Data Stream Processing System (DSPS) Data represented as time-dependant data stream Queries static but data dynamic Stream DSPS Results Low latency result stream In-memory architecture Working Storage Queries Peter Pietzuch - Imperial College London Transient state 9 Relational Data Stream Model Streams consist of infinite sequence of tuples Tuples typically associated with timestamp, e.g. arrival time, time of reading, ... Tuples can have fixed relational schema Set of attributes highway = M25 segment = 42 direction = north

speed = 85 Vehicles(highway, segment, direction, spee Vehicle speed data t1 highwa y segmen t directio n speed t2 highwa y segmen t directio n speed t3 highwa y segmen t directio n speed Sensors data stream

t4 ... highwa y segmen t directio n speed highwa y segmen t directio n speed Peter Pietzuch - Imperial College London time highwa y segmen t directio n speed highwa y

segmen t directio n speed highwa y segmen t directio n speed highwa y segmen t directio n speed highwa y segmen t directio n speed 10 Windows for Processing Infinite Streams

How many tuples should we process each time? Process tuples in window-sized batches Time-based window with size at current time t [t - : t] [t : t] Vehicles[Range seconds] Vehicles[Now] Count-based window with size n: last n tuples highwa y segme nt directio n speed highwa y segme nt directio n speed highwa y segme

nt directio n speed Vehicles[Rows n] highwa y segme nt directio n speed window highwa y segme nt directio n speed now Peter Pietzuch - Imperial College London highwa y segme nt directio

n speed highwa y segme nt directio n speed highwa y segme nt directio n speed highwa y segme nt directio n speed highwa y segme nt directio n speed

11 Providing Well-Defined Query Semantics CQL: SQL-based declarative language for continuous queries Based on well-defined relational algebra (select, project, join, ) Example: Identifying slow moving traffic on a highway: Find highway segments with average speed below 40 km/h select highway, segment, direction, AVG(speed) as avg from Vehicles[range 5 seconds slide 1 second] group by highway, segment, direction having avg < 40 Peter Pietzuch - Imperial College London 12 Roadmap Introduction to Stream Processing (1) Scalable and Parallel Stream Processing but with Principled Query Semantics (2) Streaming Machine Learning Applications ...but with a Natural Programming Model Conclusions Peter Pietzuch - Imperial College London 13 How to Scale Big Data Systems?

Use scale out and not scale up Commodity multi-core servers Fast network interconnect Data-parallelism is king Software designed for failure Google: over 20 data centre locations over 1 million servers 260 Megawatts (0.01% of global energy) 4.2 billion searches per day (2011) Exabytes (1018) of storage Peter Pietzuch - Imperial College London 14 Need to Exploit Parallel Hardware Servers have many parallel CPU cores Servers with GPUs common GPU have even more specialised cores C11 C55 C22 C66

C33 C77 C44 C88 Socket 2 Socket 1 PCIe Bus L3 Command Queue SMX1 ... SMXN C11 C55 C22 C66 C33 C77 C44

C88 10s of SMXs 1000s of FP32 cores 100s of FP64 cores L3 L2 Cache DRAM DMA Peter Pietzuch - Imperial College London DRAM 10s of KB L2 Cache 10s of GB DRAM 15 Task vs Data Parallelism Stream s ... n machines in data centre Results select distinct W.cid select distinct W.cid

From Payments [range 300 seconds] as W, select distinct W.cid FromPayments Payments [range 3001seconds] [partition-by row] as Las W, select highway, segment, direction, AVG(speed) From Payments [range 300 seconds] as W, Payments [partition-by 1 row] as L where W.cid = L.cid and W.region

!= L.region from Vehicles[range 5 seconds slide 1 second] Payments [partition-by 1 row] as L where W.cid = L.cid and W.region !=direction L.region group by highway, segment, where W.cid = avg L.cid

Peter Pietzuch - Imperial College London 16 Use Apache Hadoop for Stream Processing? Data model: (key, value) pairs reduce R R R MapReduce model Two processing functions: map(k1,v1) list(k2,v2) reduce(k2, list(v2)) list (v3) shuffle Benefits: map M M M Simple programming model Transparent parallelisation

Fault-tolerant processing partitioned data on HDFS distributed file system Peter Pietzuch - Imperial College London 17 Executing Hadoop Jobs on Cluster Map/reduce tasks scheduled across cluster nodes Shuffle phase (repartitioning) introduces synchronisation barrier R M M R M R MapReduce is a batch processing model Peter Pietzuch - Imperial College London 18 UC Berkeley, SOSP13 Apache Spark: Micro-Batching Idea:

Reduce size of data partitons to produce up-to-date, incremental results Micro-batching for data Compute tasks operate on microbatch partition Parallel recomputation of partition after failure Challenges: Scheduling overhead for small partitions? Input data is discretised stream Peter Pietzuch - Imperial College London Query semantics with microbatches? 19 Apache Storm: Pipelined Dataflows Idea: Materialise tasks in dataflow graph to avoid scheduling overhead Many systems do this, e.g. Apache Storm, Apache Flink, SEEP, Google Dataflow, Challenges: Allocation of tasks to nodes? Failure recovery? Peter Pietzuch - Imperial College London 20

Latency Impact of Micro-Batches Streaming word count of text data Deployed on 4 nodes (4-core 3.4 Ghz Intel Xeon with 8 GB RAM) SEEP Peter Pietzuch - Imperial College London 21 Parallel Processing with Sliding Windows select highway, segment, direction, AVG(speed) as avg from cars [range 5 seconds slide 1 second] group by highway, segment, direction having avg < 40 7 6 5 4 3 2 1 w1

Task Task w2 w3 w4 Worker A Worker B Synchronise to output results in order Leads to redundant computation Peter Pietzuch - Imperial College London 22 No Redundant Computation with Panes Pane: smallest unit of parallelism without data dependencies based on window semantics Pane size = gcd (window size, slide) 7 6 5 4

3 2 1 p1 p2 p3 p4 p5 p6 p7 p8 Peter Pietzuch - Imperial College London Panes can be processed in parallel Window results assembled by panes results 23 How to Relate Panes to Tasks? T h ro u g h p u t ( 1 0 6 tu p le s /s ) Spark imposes lower bound on window slide 1s 2s

3s 4s 5s 2 1.5 Micro-batch size limited by window slide 1 0.5 0 Window slide limited by min. latency (~500 ms) 0 1 2 3 4 5

6 7 8 9 Window slide (106 tuples/s) Avoid coupling query performance with query definition Peter Pietzuch - Imperial College London 24 Imperial, SIGMOD16 SABER: Window Fragment Model Idea: Decouple task size from window size/slide e.g. 5 tuples/task, window size 7 rows, slide 2 rows T3 15 14 13 12 11 T2 10 9 T1 8

w5 7 6 w4 5 4 3 w3 2 w2 1 w1 Task contains one or more window fragments Closing/pending/opening windows in T2 Workers can process fragments incrementally Peter Pietzuch - Imperial College London 25 Merge Window Fragment Results Idea: Decouple task size from window size/slide

Assemble window fragment results Output them in correct order Worker A: T1 w1 w2 w3 w2 result Empty w5 w4 w1 w2 w3 Empty w1 result Slot 2 Slot 1 Result Stage Output result

circular buffer Worker B: T2 Worker A storesT2T1results results,and merges fragment results Worker B stores exitswindow (nothing to forward) and forwards complete windows downstream Peter Pietzuch - Imperial College London 26 SABER: Window Performance 8 0.2 7 6 0.15 5 4

0.1 3 2 0.05 1 0 64 256 1024 4096 L a te n c y (s e c ) T h r o u g h p u t ( G B /s ) select AVG(S.1) from S [rows 1024 slide x] SABER SABER Latency 0 16384 65536 Window Slide (Bytes) Peter Pietzuch - Imperial College London 27

Roadmap Introduction to Stream Processing (1) Scalable and Parallel Stream Processing but with Principled Query Semantics (2) Streaming Machine Learning Applications ...but with a Natural Programming Model Conclusions Peter Pietzuch - Imperial College London 28 Democratising Big Data Analytics Need to enable more users to do (streaming) Peter Pietzuch - Imperial College London analytics 29 Programming Language Popularity Peter Pietzuch - Imperial College London 30 Programming Models For Streaming Apps? Declarative query languages (e.g. CQL) challenging for complex algorithms Consider iterative machine learning algorithms over streaming data Typically need to use user-defined functions (UDFs) Domain experts tend to write imperative programs Java, Matlab, C++, R, Python, Fortran,

Distributed dataflow frameworks favour functional programming models MapReduce, SQL, PIG, DryadLINQ, Spark, Simplifies consistency and fault tolerance Peter Pietzuch - Imperial College London 31 ML Example: Recommender System Recommendations based on past user behaviour through collaborative filtering (cf. Netflix, Amazon, ) User A Recommen d: Apple Watch User A Rating: Item:3 iPhone Rating: 5 Up-to-date recommendations Customer activity on website Distributed dataflow graph (eg Hadoop, Spark, Dryad, Flink, Naiad, ) Peter Pietzuch - Imperial College London

32 Online Collaborative Filtering in Java Update with new ratings User-A User-B ItemA 4 0 Matrix userItem = new Matrix(); Matrix coOcc = new Matrix(); ItemB 5 5 User-Item matrix (UI) void addRating(int user, int item, int rating) { userItem.setElement(user, item, rating); updateCoOccurrence(coOcc, userItem); } Multiply for recommendati on UserB

1 2 x ItemA 1 ItemA Item1 Co-Occurrence B ItemB 1 Vector getRec(int user) { Vector userRow = userItem.getRow(user); Vector userRec = coOcc.multiply(userRow); return userRec; } 2 matrix (CO) Peter Pietzuch - Imperial College London 33 Collaborative Filtering in Spark (Java) // Build the recommendation model using ALS int rank = 10; int numIterations = 20;

MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01); // Evaluate the model on rating data JavaRDD> userProducts = ratings.map( new Function>() { public Tuple2 call(Rating r) { return new Tuple2(r.user(), r.product()); } } ); JavaPairRDD, Double> predictions = JavaPairRDD.fromJavaRDD( model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map( new Function, Double>>() { public Tuple2, Double> call(Rating r){ return new Tuple2, Double>( new Tuple2(r.user(), r.product()), r.rating()); } } )); JavaRDD> ratesAndPreds = JavaPairRDD.fromJavaRDD(ratings.map( new Function, Double>>() { public Tuple2, Double> call(Rating r){ return new Tuple2, Double>( new Tuple2(r.user(), r.product()), r.rating()); } } Peter)).join(predictions).values(); Pietzuch - Imperial College London 34 Collaborative Filtering in Spark (Scala) // Build the recommendation model using ALS

val rank = 10 val numIterations = 20 val model = ALS.train(ratings, rank, numIterations, 0.01) // Evaluate the model on rating data val usersProducts = ratings.map { case Rating(user, product, rate) => (user, product) } val predictions = model.predict(usersProducts).map { case Rating(user, product, rate) => ((user, product), rate) } val ratesAndPreds = ratings.map { case Rating(user, product, rate) => ((user, product), rate) }.join(predictions) All data is immutable, no fine-grained model updates Peter Pietzuch - Imperial College London 35 Streaming State as First Class Citizen Tasks process stream data Item 1 Item 2 5 User A 2 User B 4 1 State Elements (SEs) represent transient state

Dataflow s represen t streams State elements (SEs) are mutable in-memory data structures Tasks have local access to SEs SEs can be shared between tasks Peter Pietzuch - Imperial College London 36 Challenges with Large Streaming State Matrix userItem = new Matrix(); Matrix coOcc = new Matrix(); Big Data problem: Matrices become large State will not fit into single node Challenge: Handling of distributed state? Peter Pietzuch - Imperial College London 37 Partitioned State Elements Idea: Partitioned SE split into disjoint partitions

[0-k] Key space: [0-N] [(k+1)-N] User-Item matrix (UI) Access by key ItemA ItemB UserA 4 5 User-B 0 5 State partitioned according to partitioning key Peter Pietzuch - Imperial College London hash(msg.id) Dataflow routed according to hash function 38

Partial State Elements Partial SEs are replicated (when partitioning is impossible) Tasks have local access Access to partial SEs either local or global Local access: Data items sent to one Peter Pietzuch - Imperial College London Global access: Data items sent to all 39 Imperial, USENIX ATC14 Stateful Dataflow Graphs (SDG) SEEP distributed dataflow framework Annotated Java program (@Partitioned, @Partial, @Global, ) Program.java Static program analysis Data-parallel

Stateful Dataflow Graph (SDG) Peter Pietzuch - Imperial College London Translation & checkpointbased fault tolerance Cluster 40 SDG: Online Logistic Regression 100 GB training dataset for classification Deployed on Amazon EC2 (m1.xlarge VMs with 4 vCPUs and 16 GB RAM) SDG Spark SDGs have comparable throughput to Spark despite mutable state Peter Pietzuch - Imperial College London 41 Summary Scalable stream processing key component in todays Big Data stacks Many applications and services require real-time view of data streams

Online machine learning is growing in importance (1) Modern parallel hardware (multicore CPUs/GPUs) raises challenges Need new system designs that exploit data parallelism Must not compromise well-founded query semantics SABER: Windows handling in parallel streaming engine (2) Easy-of-use key in adoption of streaming platforms Complex streaming applications require expressive programming Peter Pietzuch models Thank you! Any Questions? Want to provide programming abstractions that are natural to http://lsds.doc.ic.ac.uk users

Recently Viewed Presentations

  • PHILOSOPHY OF DREAMS AND SLEEPING - Matskut

    PHILOSOPHY OF DREAMS AND SLEEPING - Matskut

    John Suttonlaments in hisarticle "Dreaming" thatalthoughthere is somerecentdevelopment in the philosophy of dreaming, the discipline is in a veryprimitivestatecompared to, for example, philosophy of emotions, memory, colour vision etc. However, there is now The International Association for the Study of...
  • Gjerdingen Workshop - Digging Into Data

    Gjerdingen Workshop - Digging Into Data

    ELVIS Electronic Locator of Vertical Interval Successions The First Large Data-Driven Research Project on Musical Style * Figuring out how to ask a computer about this forced us to articulate what we meant by repetition.
  • Strategy in the Global Environment - Cengage

    Strategy in the Global Environment - Cengage

    Strategy in the Global Environment ... Joint Venture Separate corporations come together to form a new corporate entity Two or more companies have an ownership stake, but combine resources for mutual benefit Sharing knowledge can be dangerous for the companies...
  • ΑΝΙΣΟΚΥΤΤΑΡΩΣΗ

    ΑΝΙΣΟΚΥΤΤΑΡΩΣΗ

    ΑΝΙΣΟΚΥΤΤΑΡΩΣΗ Παθολογική διακύμανση του μεγέθους των ερυθρών αιμοσφαιρίων (φυσιολογική διάμετρος = 7-8 μm)
  • Verbs Part 2 - Mrs. Serrato Spanish 1 &amp; 2

    Verbs Part 2 - Mrs. Serrato Spanish 1 & 2

    What is the stem-change for the verb jugar? What forms will have a spelling change? Jugar's stem change is . u ->ue. All forms except for nosotroswill have the u turn in to ue when being conjugated.
  • Salinity Distribution of Nueces Delta

    Salinity Distribution of Nueces Delta

    Hydrodynamic model of Nueces Delta. Ultimate goal: Restoration of Nueces Delta ecosystem. Problem Definition. Conclusion. Future Work. Result. Study Method. The goal of this project is to study the spatial and temporal distribution of salinity, investigate the effect of pumping...
  • Black Rice - Montclair State University

    Black Rice - Montclair State University

    Black Rice. Week 11. Lecture 01. ... This basic chicken dish later became a favorite of immigrant groups from Europe and Asia who added new varieties of chickens from their homelands. Source: Lawler, Andrew. 2014. ... Cornell Chronicle. October 9,...
  • Engineering Economic Analysis - 9th Edition

    Engineering Economic Analysis - 9th Edition

    Engineering Economic Analysis 9th Edition Chapter 6 ANNUAL CASH FLOW ANALYSIS Annual Cash Flow Calculations Resolving a Present Cost to an Annual Cost Simplest case is to convert the PV to a series of EUAW (equivalent uniform annual worth) cash...