Ross Ciel

Reducing the Overhead of Tasks in Ciel Ross Lagerwall March 20, 2013 1 Introduction Ciel is a distributed execution e...

0 downloads 84 Views 312KB Size
Reducing the Overhead of Tasks in Ciel Ross Lagerwall March 20, 2013

1

Introduction

Ciel is a distributed execution engine which supports dynamic control flow [6]. This means that tasks can start other tasks which makes the model flexible and allows many different types of algorithms to be used on top of the engine. However, there is a significant overhead to starting tasks which is a fact that programmers writing algorithms on top of Ciel need to consider. This report investigates the costs of starting up a task and how these costs can be reduced with the goal of reducing the overall execution time of a Ciel computation.

2

Ciel

Before investigating the costs of task startup, I will give an overview of Ciel. Ciel is a distributed execution engine such that tasks are arranged in a directed acyclic graph (DAG). The DAG can be created dynamically by other tasks which gives some flexibility compared with other systems such as Dryad whose DAG must be fully specified before the job starts [6]. The Ciel architecture, shown in Figure 1, consists of three different types of nodes: masters, workers and clients. There is only one client – this is the node that submits the job to be processed. There is usually only one master node (although fault tolerance may use a secondary master) which controls the execution of jobs. It keeps three tables: a table of the tasks in the system, a table of the objects in the system, and a table of the workers in the system. The master is responsible for scheduling tasks to be executed on workers. Workers register with a master at startup and then wait to be assigned tasks. Objects are stored locally on workers. To make use of an object which is not stored locally, the worker must either fetch the object from another worker or, in certain cases, it may stream the object. Ciel talks about objects using either concrete references or future references. Both name the object but concrete references also list the locations where the object may be found. Tasks can be made dependent on references so that they only start when the references become concrete. Concrete references may either be given as inputs by the client at the start of the job or may be published by other tasks. Tasks are non-blocking which means that a special trick is needed to allow tasks to dynamically wait for references to become concrete. The task creates a new task called a continuation task which depends on the reference and so ‘continues’ the original task after the reference becomes available. Tasks are actually executed by executors which run the 1

Master Client

TC P

Co

nn e

cti on

Object Table Worker Table Task Table

Worker

...

Object Store Executor Interface

Name

d Pipe

Object Store Executor Interface

Worker

External Process

External Process

Figure 1: An overview of the Ciel architecture. user-specified task code. There are executors for many languages including Java, Scala and Skywriting, a custom language for Ciel. Ciel defines a protocol for communication between executors and the worker which lets the user-specified code open references, publish references, spawn new tasks, etc. There are several other systems which provide for data-centric programming. MapReduce provides a simple model based on a functional programming paradigm [3]. But it is fairly limited in the kinds of algorithms that it supports. Dryad provides a DAGbased model similar to Ciel’s but it does not support tasks dynamically spawning other tasks [4]. HaLoop adds iterative support to MapReduce [2] while Incoop adds incremental support [1]. Finally, Naiad supports both incremental and iterative computations but does not yet have a distributed implementation [5].

3

Motivation

The Ciel paper provides an illuminating example of why it would be useful to reduce the overhead of starting tasks in Ciel. The example is based on the Smith-Waterman sequence alignment algorithm. The algorithm implementation divides the processing into blocks, executing one block per task. When using n × n blocks, the maximum parallelism is n. This indicates that it is useful to use more blocks to extract parallelism. However, Murray et al. note that when using more than 30 × 30 blocks, the overhead of starting tasks becomes a significant factor in the overall computation and the total execution time increases. By reducing the overhead of task dispatch, it would allow a large number of blocks to be used, increasing the possibility for parallelism and ultimately decreasing execution time. Another example is with regards to BSP-style graph processing. Little computation is

2

performed per vertex which means that the natural implementation of using one vertex per task would have too high an overhead. Implementing BSP on top of Ciel would require each task to execute the computation for a number of vertices. Reducing the overhead of starting a task would reduce the impedance mismatch between vertices and tasks and allow more fine-grained tasks.

4

Profiling

Ciel is a complex, distributed system which makes profiling performance issues difficult. Given a simple setup with a master and a single worker, there are several places where processor time is used. The three obvious places are within the master Ciel process, within the worker Ciel process, and within the actual process which does the computation (for example, a Java process). More subtly, each Ciel process starts up a lighttpd web server to use for communication between the Ciel processes. In addition to the processing time used, another potentially significant factor is the latency of communication between these processes. The majority of this latency would come from the communication between workers and the master since this would usually need to traverse an Ethernet connection but there is also latency incurred due to context switching between the lighttpd process and the Ciel process, and the Ciel process and the executor. The profiling performed here uses the NumberInc benchmark described in Section 6.1. In a test execution with a single master and worker running across a LAN, the total time to return the result to the client was 28.5s while the master used 6.6s of CPU time, the worker used 9.2s of CPU time and the executor used 7s of CPU time. Adding all these times up indicates a total of 22.8s of CPU time which means that 5.7s or 20% of the total execution time is lost to latencies. A sample profile for the master, the worker and the executor is shown in Appendix A. The results from the master and worker show that the profile is flat. In other words, there is no single ‘hot’ spot in the code where all the CPU time is used. This makes optimization difficult because any single improvement made does not have a dramatic affect on the overall execution time.

5

Optimizations

Several different optimizations were made based on the profiling results.

5.1

Direct TCP Connection

When the master wants to start a task on a worker, it uses PycURL to make a POST request to the worker’s lighttpd web server. Lighttpd is configured such that any URL which begins with /control/ is passed through a socket to CherryPy using the FastCGI protocol. CherryPy is a web framework which runs as part of Ciel. CherryPy passes the request to different Ciel functions depending on the URL. The same process is performed in the opposite direction when the worker needs to tell the master that the task is complete. PycURL can maintain persistent HTTP connections but these may time out in which case the connection would need to be reestablished resulting in increased latency. The above procedure results in several context switches and some unnecessary 3

interprocess communication which could be reduced by using direct TCP sockets. So, the first optimization made as part of this work was to use direct TCP sockets between the master and each worker for task communication. This effectively cuts out the overhead of using lighttpd and CherryPy which should reduce the latency seen in the profiling results. The same TCP connection can be used in both directions: for the master scheduling tasks on a worker and for a worker communicating task results back to the master. The implementation currently only uses this connection for task communication but it could eventually be used for all communication which would remove the dependency on CherryPy and lighttpd.

5.2 5.2.1

Worker-executor Communication Reference Prefetching

The worker communicates with executors by sending JSON-encoded messages using read and write pipes. This is used to provide the various methods available to an executor, such as to be able to spawn tasks, tail spawn, or block on references. With the Java executor, the worker would send a task to the executor and then the executor would request several references from the worker corresponding to the various jar files needed to execute the task. This is wasteful – it introduces many context switches and unnecessary encoding and decoding of JSON messages – so the worker was changed to prefetch the references needed for task startup and pass their filenames along with the task message. This resulted in a significant speedup. 5.2.2

Combining Messages

Some of the methods in the Ciel Java bindings require sending several messages between the worker and the executor. Many of these can be combined which, as before, reduces context switches and the number of JSON messages that need to be encoded and decoded. Before, spawning a task would require the following messages to be sent: allocate_output, open_output, close_output, and spawn. First, an object name is allocated, then the filename corresponding to the object is retrieved, then the executor writes out the task object to this file, then the reference is closed and, finally, the actual spawn message is sent. This was changed to use only two messages by combining the first two into an allocate_open message and then changing the semantics of spawn to take an additional reference which is closed by the worker when it handles the spawn message. The number of messages for the tailSpawn() method was reduced by using the same changes as for the spawn() method. Previously, the blockOn() method sent two messages: tail_spawn, and exit. This was changed to use only one message by creating a new tail_spawn_exit message which combines the functionality of tail_spawn and exit. These changes also resulted in a significant speedup.

5.3

Pickle

Pickle is a serialization library that is part of the Python standard library.1 It is used extensively throughout Ciel. Appendix A.2 shows that many of the most expensive func1

http://docs.python.org/2/library/pickle.html

4

tions for the worker are Pickle functions. Pickle is implemented purely in Python which means that it is interpreted and, consequently, slow. cPickle is a drop-in replacement for Pickle implemented in C and also part of the Python standard library.2 Replacing all usages of Pickle with cPickle resulted in a moderate speedup.

5.4

Logging

Ciel makes extensive use of debug messages which by default are not actually written out. While these messages were undoubtedly helpful while implementing Ciel, they have a significant performance overhead. Several of the most expensive functions for both the master and the worker are CherryPy logging functions. Unfortunately, due to Python’s interpreted nature, these debug messages cannot be compiled out which would result in zero runtime overhead, but, by removing them a moderate speedup was achieved.

5.5

struct Usage

In the protocol that is used between the worker and the executor, the struct module is used to write the length of the JSON message in network byte order. This conversion is specified by a format string which was being parsed for each message that was sent. A micro-optimization was performed to make use of the Struct class which preparses the format string instead of parsing it for each message that is sent.

5.6

Replacing JSON with Protobufs

JSON-encoded messages are used for communication between the master and the worker and between the worker and the executor. Since the profiling data for both the master and worker show that performing the JSON encoding is the single most expensive method call, this is a logical place to optimize. One way to do this is to replace the JSON-encoded messages with a fixed, binary format which should be shorter and result in fewer packets being sent. It should also be faster to encode since it is a fixed format which does not require introspection of the data structures being sent. A commonly used library for doing this is the Protobuf library which is Google’s implementation of binary message encoding.3 However, the Python version of the Protobuf library is written entirely in Python rather than using a C or C++ extension module with the result that it is relatively slow. Replacing the JSON messages with Protobufs for the master-worker task communication resulted in a 34% slow down for the NumberInc benchmark. Since the messages that are sent are fairly large (an average of 2834 bytes for JSON compared with an average of 2191 bytes for Protobuf) and consist primarily of hashes in string form and base64encoded references, the smaller size of a binary encoding is not significant. Another disadvantage of the Protobuf encoding is that it is inflexible and requires precompilation of the message types. Consequently, Protobufs were dropped in favour of keeping the JSON-encoded messages. 2 3

http://docs.python.org/2/library/pickle.html#module-cPickle http://code.google.com/p/protobuf/

5

InitializeTask AddTask

AddTask

CombineTask

... AddTask

AddTask

CombineTask NumberIncTask

Figure 2: The task graph of the NumberInc benchmark.

6

Evaluation

6.1

Benchmarks

A number of different benchmarks were used to evaluate the performance of Ciel. • Yield is a Scala benchmark which repeatedly forms a continuation and invokes the tailSpawn() method. It should show a substantial improvement since the number of interprocess messages required for the tailSpawn() method was reduced. • Suspend is a Scala benchmark which repeatedly blocks on a reference. It should show a substantial improvement since the number of messages required for the blockOn() method was reduced. • IterativeTest is a Scala benchmark which, in the configuration used for this project, runs 500 iterations where each iteration starts two empty tasks at a time and then waits for them to finish. • IterativeTestJ is a Java benchmark which, in the configuration used for this project, runs 1000 iterations where each iteration starts an empty task and waits for it to finish. • NumberInc is a Java benchmark which creates a set of chained tasks and waits for the result. An initial task creates a reference and writes out a number to that reference. For each iteration (of which there are 500), the benchmark creates two tasks which take the previous iteration’s reference (or the initial task’s reference) and return that number incremented by one. A combiner task reads the references created by the two adder tasks and returns the same number. This reference is then used in the next iteration. This results in a long chain of tasks dependent on each other. While still a synthetic benchmark designed to emphasize task startup time, this benchmark is more complicated than the previous benchmarks. The task graph for NumberInc is shown in Figure 2. 6

Benchmark Yield Suspend IterativeTest IterativeTestJ NumberInc SmithWaterman 10 × 10 SmithWaterman 30 × 30

Before 22.29 37.52 23.31 23.20 24.41 27.67 77.24

Local After 9.94 14.60 10.72 10.19 11.50 26.58 68.16

% Before 55% 23.35 61% 39.92 54% 25.91 56% 24.16 53% 25.85 4% 29.31 12% 80.85

LAN After 11.55 17.59 13.21 12.04 14.07 27.81 70.57

% 51% 56% 49% 50% 46% 5% 13%

Table 1: Results showing the execution time of Ciel in seconds for various benchmarks before and after the changes as well as the percentage improvement. The tests were performed on a single machine and across a LAN. • SmithWaterman 10 × 10 is a Java benchmark which implements the Smith-Waterman sequence alignment algorithm. Two strings of length 100 000 bytes each were used. The number of blocks was 10 × 10. • SmithWaterman 30×30 is the same as the previous benchmark but with the number of blocks changed to 30 × 30. The first three benchmarks are synthetic benchmarks distributed with Ciel and evaluated in Derek Murray’s PhD thesis [7]. The next two were custom-written for this project. The final two benchmarks are part of the example code distributed with the Ciel Java bindings.

6.2

Results

The benchmarks were performed in two environments. Running the master, the worker (and correspondingly, the executor), and client all on the same machine is represented by the Local column in Table 1. Running the master on one machine and the worker and client on another machine is represented by the LAN column in Table 1. In all cases, the machines used Ubuntu Linux 12.04 with Intel Core i5 3570K processors and 8GiB of RAM. In the LAN benchmarks, 100 Mbit/s Ethernet was used. The changes resulted in a significant performance increase for the synthetic benchmarks, averaging a 56% improvement for the Local case and a 50% improvement for the LAN case. The changes had less effect for the real-world SmithWaterman benchmark. For the 10 × 10 benchmark, the performance increase was between 4% and 5%. For the 30 × 30 benchmark, the performance increase was between 12% and 13%.

6.3

Postoptimization Profiling

Repeating the same benchmark used in the original profiling resulted in the benchmark taking 15s to return the result to the client, the master using 2.2s of CPU time, the worker using 7.8s of CPU time and the executor using 4s of CPU time. Adding these CPU times up results in a total of 14s of CPU time which means that only 1s or 6.7% of

7

the total execution time was lost to latencies. A sample profile for the master, the worker and the executor is shown in Appendix B.

7

Discussion

Analyzing the results shows that the optimizations halved the time that the synthetic benchmarks take to run across a LAN and more than halved the time when run on a single machine. In general, the LAN times are slightly worse overall due to the extra latency in communication between the master and worker. The percentage improvement for the LAN case is slightly lower than for the Local case because there are no fewer master-worker messages being sent in the optimized version and so the increased masterworker latency dominates the overall execution time. When it comes to the real-world benchmarks, the percentage improvement is much smaller because the actual computation dominates the execution time. However, there is still a definite improvement. There is a greater percentage improvement for the SmithWaterman 30 × 30 benchmark compared with the 10 × 10 because it starts up more tasks and so the task startup time is more of an overhead. This indicates that the optimizations open up the possibility for more parallelism in algorithms because the overhead of task startup is smaller and so more tasks can be started. It is clear that the optimizations have had an effect on the profiling graphs. The change to cPickle has removed it from the list of ‘hot’ methods. The removal of debug logging has also removed the CherryPy code from the list of ‘hot’ methods. It is evident that the most CPU-consuming activity that remains is the JSON processing which makes it a logical aspect with which to begin further optimization. With fewer messages being exchanged between the worker and the executor during task initialization, postoptimization profiling shows a decrease as a percentage in the time spent in the getTask() method in the executor.

8

Conclusions and Future Work

This work has shown that the overhead of starting tasks does not have a single ‘hot’ spot where all the time is spent; instead, it is made of a number of components distributed across multiple machines and processes as well as encompassing the latency between these machines and processes. But, by making several changes, it is possible to halve the execution time for synthetic benchmarks and achieve a moderate reduction for a real-world benchmark. Since some algorithms are naturally expressed with little per-task computation, this is an important result. The code has been made available on GitHub.4 There are several aspects for future work. First, this work needs to be evaluated on a real-world cluster with tens of machines to better quantify the performance gain. In addition, this may expose other bottlenecks; for example, the master’s task scheduling and dispatch may become a bottleneck with many workers. Unfortunately, this evaluation was not possible within the scope of the project. Secondly, looking at the postoptimization profiling, it appears that the JSON encoding and decoding is the biggest performance 4

The Ciel repository is at https://github.com/rosslagerwall/ciel and the Java and Scala bindings are available at https://github.com/rosslagerwall/ciel-java.

8

bottleneck. A Protobuf-like mechanism could be investigated to find a high-performance serialization library which can encode the messages in binary format. One option would be to activate the experimental Protobuf C++ extension which is available in the latest release.5 A related task would be to reduce the size of the messages to fit within a single network packet. A possibility would be to gzip the messages although this would then have an impact on the CPU usage. Another option would be to change what is actually sent in the messages. Instead of sending and receiving hexadecimal hash strings and base64-encoded references, these objects could be kept in binary format, reducing their size and consequently the size of the messages that are sent. These changes would require a considerable rework of Ciel but would likely result in another significant improvement in execution time.

References [1] Pramod Bhatotia, Alexander Wieder, Rodrigo Rodrigues, Umut A. Acar, and Rafael Pasquin. Incoop: Mapreduce for incremental computations. In Proceedings of the 2nd ACM Symposium on Cloud Computing, SOCC ’11, pages 7:1–7:14, New York, NY, USA, 2011. ACM. [2] Yingyi Bu, Bill Howe, Magdalena Balazinska, and Michael D. Ernst. Haloop: efficient iterative data processing on large clusters. Proc. VLDB Endow., 3(1-2):285–296, September 2010. [3] Jeffrey Dean and Sanjay Ghemawat. Mapreduce: simplified data processing on large clusters. In Proceedings of the 6th conference on Symposium on Opearting Systems Design & Implementation - Volume 6, OSDI’04, pages 10–10, Berkeley, CA, USA, 2004. USENIX Association. [4] Michael Isard, Mihai Budiu, Yuan Yu, Andrew Birrell, and Dennis Fetterly. Dryad: distributed data-parallel programs from sequential building blocks. SIGOPS Oper. Syst. Rev., 41(3):59–72, March 2007. [5] Frank McSherry, Rebecca Isaacs, Michael Isard, and Derek G. Murray. Composable incremental and iterative data-parallel computation with naiad. Technical report, Microsoft Research, 2012. [6] Derek G. Murray, Malte Schwarzkopf, Christopher Smowton, Steven Smith, Anil Madhavapeddy, and Steven Hand. Ciel: a universal execution engine for distributed data-flow computing. In Proceedings of the 8th USENIX conference on Networked systems design and implementation, NSDI’11, pages 9–9, Berkeley, CA, USA, 2011. USENIX Association. [7] Derek Gordon Murray. A distributed execution engine supporting data-dependent control flow. PhD thesis, University of Cambridge, 2011. 5

http://code.google.com/p/protobuf/source/browse/trunk/CHANGES.txt

9

A

Original Profiling

A.1

Master

The yappi code profiler was used.6 It is a deterministic profiler which means that it counts and records the time of every function call. The following is a sample of the lines from the profiler. Each number listed here is the CPU time in seconds spent in that function excluding the time spent in functions called from that function. simplejson/iterencode cherrypy/logging/time pycurl_main_loop job_pool/_report_tasks job_pool/_schedule cherrypy/__init__/__call__ cherrypy/logging/error

A.2

0.26 0.13 0.10 0.09 0.07 0.06 0.05

Worker

Profiling for the worker was performed in the same manner as for the master. simplejson/iterencode pickle/save cherrypy/logging/time pickle/memoize pickle/load proc/json_event_loop cherrypy/logging/error

A.3

0.66 0.53 0.50 0.34 0.33 0.24 0.20

Executor

Ciel started two instances of the JVM during the profiling session. The OKTECH profiler was used in sampling mode.7 In contrast to the previous two listings, each number listed here is the percentage of time spent in that function. JVM 1: |- NumberInc.invoke() |- Ciel.blockOn() |- Ciel.spawn() |- JsonPipeRpc.getTask() |- JsonPipeRpc.receiveMessage() JVM 2: |- JsonPipeRpc.getTask() |- JsonPipeRpc.receiveMessage |- Reference.fromJson() |- CombinerTask.invoke() 6 7

36% 78% 6% 34% 100% 96% 95% 0.5% 0.4%

http://code.google.com/p/yappi/ http://code.google.com/p/oktech-profiler/

10

B

Postoptimization Profiling

Postoptimization Profiling was performed in the same manner as for the original profiling.

B.1

Master

simplejson/iterencode simplejson/raw_decode references/default scheduling_policy/select_workers_for_task job_pool/_report_tasks job_pool/_schedule taskpool/task_from_descriptor

B.2

Worker

simplejson/iterencode simplejson/raw_decode posixpath/join simplejson/loads task_graph/publish datastore/create_datavalue proc/json_event_loop

B.3

0.27 0.11 0.06 0.05 0.05 0.04 0.04

0.46 0.33 0.15 0.14 0.12 0.12 0.11

Executor

JVM 1: |- NumberInc.invoke() |- Ciel.blockOn() |- Ciel.spawn() |- JsonPipeRpc.getTask() |- JsonPipeRpc.receiveMessage() JVM 2: |- JsonPipeRpc.getTask() |- JsonPipeRpc.receiveMessage |- AddTask.invoke()

53% 83% 4% 20% 100%

90% 100% 2%

11