ML tasks such as distributed training and batch inference stretch the abstractions of modern data processing systems, leading to performance or learning efficiency tradeoffs. In this talk we introduce Ray Dataset, a universal compatibility layer built on Arrow and Python that allows data processing to be combined with ML pipelines without such tradeoffs.
As modern machine learning techniques have grown increasingly popular it has become increasingly important to scale these techniques and integrate them with existing machine learning pipelines. Ray is a distributed system powerful enough to solve the technical challenges at the various stages of these pipelines, while providing an interface, and ecosystem of libraries and integrations which allow python developers to continue using their favorite, familiar tools.
Consider the ML pipeline for the typical deep learning model. First, raw data must be ingested and preprocessed. Typically this data is stored in a distributed storage system like HDFS or S3. It is then preprocessed by a job which uses ETL systems like Spark. The output is then written back to a storage system. Finally, a new job is created which reads the preprocessed data and uses machine learning framework like pytorch, tensorflow, or horovod to actually train the model (this step may be repeated many times for hyper parameter tuning).
While this approach works with traditional technologies, there are a few issues which can make it extremely inefficient. Because these jobs typically run directly on a cluster manager like Kubernetes, they are typically limited to communicating via storage systems, which requires writing the data to disk. Communication within, or between jobs may also run into additional serialization/deserialization overhead. In many jobs, this unnecessary overhead can dominate the CPU utilization of a cluster. Finally, because these technologies were developed in different communities, they are often difficult to stitch together resulting in one-off “glue”, awkward conversions, ad-hoc orchestration code, and other usability shortcomings.
We aim to solve these problems with Ray. Ray aims to solve these problems with a two-fold approach: (1) By providing a powerful, general purpose distributed computing system capable of solving the technical challenges in these frameworks, and (2) providing an ecosystem of high level libraries and integrations which are easy to use and scale without requiring the user to have distributed computing expertise.
Ray’s architecture has a few key components which allow it to solve these challenges. It uses a decentralized, per-node, scheduler to schedule fine grain tasks while avoiding the challenges that centralized schedulers face. It also includes a distributed, shared-memory object store which allows processes to share objects like numpy arrays and tensors without expensive serialization and copying. It can also perform the complex shuffles needed for data preprocessing operations. It exposes an interface which can allow arbitrary python functions to execute on a cluster, and the management of arbitrary state (including hardware such as GPUs) using python classes. It can also manage allocation of hardware resources as CPUs, GPUs, and TPUs.
Ray’s ecosystem of libraries and integrations make it easy to leverage the power of Ray, without becoming a distributed systems expert. For example, distributed libraries like Modin can provide the familiar pandas API but scale to a cluster, while integrations like Dask-on-Ray and RayDP allow you to use Dask Distributed or PySpark on Ray can be used for data preprocessing. Integrations with ML training libraries like Horovod, sklearn, and XGBoost allow users to use existing ML frameworks with Ray. Finally, to move data from preprocessing to training, Ray Dataset provides a universal, distributed data frame representation to make conversion from preprocessing frameworks to training frameworks easy.
Put together, a developer can keep set Ray as their backend (with one line of code), keep using Dask for data processing, convert their dask dataframe to a Ray Dataset (with one line of code), then pass their dataset shards directly into distributed tensorflow for training (with one line of code). This entire process requires only changing a couple lines of code and uses Ray Dataset to avoid any expensive and unnecessary writing to disk.
Ray Dataset can also be used to do simple data processing operations in a Ray native way. For example, to score a deep learning model using GPUs, is as simple as from_parquet().map(preprocess).map(cnn, num_gpus=1)
. Because the library provides such a simple interface for using Ray, it’s sometimes easy to forget how difficult it is to perform these operations in other systems, which would typically require thinking about IO performance, backpressure, defining resource configurations, etc.
This talk focuses on unifying the data processing and training steps of a machine learning pipeline, but the same theme holds true with other parts of the machine learning ecosystem with libraries like Tune for hyper parameter tuning, Serve for live model serving, and RLLib for reinforcement learning.