16/07 - Micro-batching - High performance writes by Summit16

Micro-batching: High performance writes
展开查看详情

1.Micro-batching: High performance writes Adam Zegelin, Instaclustr

2.Me Adam Zegelin • Co-founder of Instaclustr Instaclustr • Managed Apache Cassandra & DSE + Spark/Zeppelin in the ☁ • Amazon Web Services, Azure, IBM Softlayer • 24×7×365 Support • Cassandra & Spark Consulting • Enterprise support-only contracts

3.Problem Background • Metrics — 2,000+ metrics (events) per-node, collected every 20 seconds • ~ 50k events per-second • Streamed via RabbitMQ to Riemann • Riemann by Aphyr • Event stream processor framework — Clojure (JVM) • Analyses/combines/filters events • Rules — events matching, over threshold, etc. • Actions — PagerDuty, email, Slack, etc. • Forwards everything to Cassandra

4.Problem Background cont’d • Riemann: • Initially, single instance • Fairly complicated to scale (+HA)
 See my blog post Post 500 Nodes: High Availability & Scalability with Riemann • Profiling: Writing to Cassandra = expensive (CPU intensive) on the client • Less CPU time to process events = backlog of events = 🔥 • End goal: reduce CPU-time spent writing to Cassandra • Batching an applicable solution for our use-case

5.Micro-batching • Insert data using small batches of Statements • Improves throughput • Reduces network overhead • Less is more

6.Partition-aware Batching • Batches of Statements, where each batch contains statements for the same partition • LoadBalancingPolicy.newQueryPlan(…) • Returns Iterable<Host>, in order of preference • For TokenAwarePolicy, returns the replicas responsible for the Statement’s partition key • Group by the first() (most-preferable) host of each statement’s query plan • Multimap<Host, Statement> groups = Multimaps.index(statements,
 s -> lbp.newQueryPlan(s.getKeyspace(), s).next()
 );
 eqiv. to Map<Host, List<Statement>>

7.Benchmark Insert 1 million rows

8.Benchmark Overview • Write 1 million rows via a variety of strategies: • Individual statements, batches & host-aware batches • Batch sizes: 10, 50, 100, 250, 500 individual INSERTs • Consistency levels: ALL, LOCAL_QUORUM, LOCAL_ONE • UNLOGGED batches • Each strategy executed 10 times — average + std. dev. • Determine: • Fastest • Lowest client CPU usage • Lowest cluster CPU usage

9.Write Strategies Individual Writes Each PreparedStatement is submitted to Cassandra via executeAsync(…) Batch Writes Groups of 𝑛 statements are combined into BatchStatements, one per group.
 Each BatchStatement is submitted via executeAsync(…) Partition Aware Batch Writes Groups of 𝑛 statements, where every statement in the group shares the same partition key, are combined into BatchStatements, one per group. Submitted via executeAsync(…)

10.Timing The stopwatch is stopped once ResultSetFuture.get() unblocks for all futures returned by executeAsync(…) Runtime Wall-clock time — System.nanoTime() via Guava’s Stopwatch CPU Time Total CPU time — OperatingSystemMXBean.getProcessCpuTime()
 Sum of the CPU time used by all threads in the JVM. Does not include I/O wait time. Cluster Average CPU Time Average of the Total CPU time across all nodes in the cluster — collected via JMX

11.Benchmark Setup Cluster • Apache Cassandra 3.7 • 9 node cluster (3 racks, 3 nodes per rack) • m4.large — 250GB of EBS SSD, 2 vCPUs, 8 GB RAM • NetworkTopologyStrategy, RF = 3 • cassandra.yaml: • Increased batch_size_error_threshold_in_kb
 (and batch_size_warn_threshold_in_kb to reduce log noise) • Increased write_request_timeout_in_ms • Disabled compactions (nodetool disableautocompaction) & auto-snapshots
 Reduce variance between benchmark runs due to background tasks.

12.Benchmark Setup cont’d Client • Single c4.xlarge — 7.5GB RAM, 4 vCPUs • OpenJDK 1.8, DataStax Cassandra Java Driver 3.1 • TokenAwarePolicy + DCAwareRoundRobinPolicy • Tweaked connection pool & socket options: • Max requests per connection: 32,000 for LOCAL & REMOTE • Max connections per host: 20 for LOCAL & REMOTE • Pool timeout: 50 sec • Socket read timeout: 50 sec • Upfront prepare(…) & bind(…) all 1 million statements

13.Benchmark Setup cont’d Table/CF & Data • Resembles our Riemann schema • Keyspace DROPed and re-CREATEd every benchmark run • Generate: • 1000 random hosts (UUIDs) × 1000 random services (UUIDs) • Static values for bucket_time, time, metric & state CREATE TABLE microbatching.example ( host text, bucket_time timestamp, service text, time timestamp, metric double, state text, PRIMARY KEY ((host, bucket_time, service), time) ) WITH CLUSTERING ORDER BY (time DESC);

14. ���������� �� ����� ���������� ���� � ���� � ��������� ������� ���� � �� � ������������ ��� ���� ����� ���� � ��� ������� ���� ��� ���� ���� � � � ��� � � ���� � ���� � ���� � ���� � � � ��� � ���� � ���� � � � ��� � ���� � ���� � � � ��� � ���� � � � ��� � � � ��� � ���� � ���� � ���� � � � ��� � � � ��� � � � ��� � ���� � ��� � ��� � ��� � � � ��� � � � ��� � � � ��� � ��� � ������� ��� ���� ������� ���� ������� ��� ���� ������� ���� ������� ��� ���� ������� ���� ������� ��� ���� ������� ���� ��� ���� ��� ���� ��� ���� ��� ���� ���������� ������� ��������� ����� ������� ��������� ����� ������� ���� ��������

15. ���������� �� ����� ����� ���� � ���� � ��������� ������� �� � ������������ ��� ���� �������� �������� � ��������������� ������� ���� ��� ���� ���� � � ���� � � � ��� � ���� � ���� � ���� � � � ��� � ���� � ���� � � � ��� � � � ��� � � � ��� � ���� � ���� � ���� � � � ��� � ���� � � � ��� � � � ��� � ���� � ���� � � � ��� � � � ��� � ���� � ��� � � � ��� � ��� � ��� � ��� � ��� � ��� � � � ��� � � � ��� � � � ��� � � � ��� � ��� � ������� ��� ���� ������� ���� ������� ��� ���� ������� ���� ������� ��� ���� ������� ���� ������� ��� ���� ������� ���� ������� ��� ���� ������� ���� ��� ���� ��� ���� ��� ���� ��� ���� ��� ���� ����� ���� �� ����� ���� �� ����� ���� ��� ����� ���� ��� ����� ���� ���

16. ���������� �� ����������� ������ ���� � ���� � ��������� ������� ����� ���� � ��� ��� ���� �������� �������� � ��������������� ������� ���� ��� ���� ���� � � ���� � � � ��� � ���� � ���� � ���� � ���� � � � ��� � � � ��� � ���� � ���� � ���� � ���� � � � ��� � � � ��� � � � ��� � ���� � ��� � ��� � ��� � ��� � � � ��� � � � ��� � � � ��� � ��� � ������� ��� ���� ������� ���� ������� ��� ���� ������� ���� ������� ��� ���� ������� ���� ��� ���� ��� ���� ��� ���� ��� ������������ ���������

17.Outcome • More-performant • Shorter runtime • Lower client & cluster CPU load • Possibly higher latency — more useful for bulk data processing • Not a silver-bullet • Standard INSERTs work well for most use-cases • Cassandra benchmarking is hard

18.Source The Java source code for this benchmark is available at: bitbucket.org/adamzegelin/microbatching

19.Thank You! Questions?