Documentation >

Scale with Dask

The Planetary Computer Hub is a JupyterHub paired with Dask Gateway for easily creating Dask clusters to distribute your computation on a cluster of machines.

Creating a cluster

Use dask_gateway.GatewayCluster to quickly create a Dask cluster.

[1]:
import dask_gateway

cluster = dask_gateway.GatewayCluster()
client = cluster.get_client()
cluster.scale(4)
print(cluster.dashboard_link)
GatewayCluster<staging.549920ff1ac94c1aa92f9d543147c8a4, status=running>

Don’t forget the ``client = cluster.get_client()`` line. That’s what actually ensures the cluster will be used for computations using Dask. Otherwise, you’ll end up using Dask’s local scheduler. This will run the computation on using multiple threads on a single machine, rather than the cluster. When you’re using a cluster, make sure to always use the Dashboard (more below). If you aren’t seeing any tasks in the dashboard, you might have forgotten to create a Dask client.

Open the dashboard

The Dask Dashboard provides invaluable information on the activity of your cluster. Clicking the “Dashboard” link above will open the Dask dashboard a new browser tab.

Dask Dashboard in a new tab.

We also include the dask-labextension for laying out the Dask dashboard as tabs in the Jupyterlab workspace.

Dask Dashboard in jupyterlab.

To using the dask-labextension, copy the “Dashboard” address from the cluster repr, click the orange Dask logo on the lefthand navigation bar, and paste the dashboard address

You can close your cluster, freeing up its resources, by calling cluster.close().

[2]:
cluster.close()

Autoscale the cluster to your workload

Dask Clusters can automatically adapt the cluster size based on the size of the workload. Use cluster.adapt(minimum, maximum) to enable adaptive mode.

[3]:
import dask_gateway

cluster = dask_gateway.GatewayCluster()
client = cluster.get_client()
cluster.adapt(minimum=2, maximum=50)

Dask will add workers as necessary when a computation is submitted. As an example, we’ll compute the minimum daily temperature averaged over all of Hawaii, using the Daymet dataset.

[4]:
import pystac_client
import planetary_computer
import xarray as xr

account_name = "daymeteuwest"
container_name = "daymet-zarr"

catalog = pystac_client.Client.open(
    "https://planetarycomputer.microsoft.com/api/stac/v1",
    modifier=planetary_computer.sign_inplace,
)
asset = catalog.get_collection("daymet-daily-hi").assets["zarr-abfs"]

ds = xr.open_zarr(
    asset.href,
    **asset.extra_fields["xarray:open_kwargs"],
    storage_options=asset.extra_fields["xarray:storage_options"]
)
ds
[4]:
<xarray.Dataset>
Dimensions:                  (nv: 2, time: 14965, x: 284, y: 584)
Coordinates:
    lat                      (y, x) float32 dask.array<chunksize=(584, 284), meta=np.ndarray>
    lon                      (y, x) float32 dask.array<chunksize=(584, 284), meta=np.ndarray>
  * time                     (time) datetime64[ns] 1980-01-01T12:00:00 ... 20...
  * x                        (x) float32 -5.802e+06 -5.801e+06 ... -5.519e+06
  * y                        (y) float32 -3.9e+04 -4e+04 ... -6.21e+05 -6.22e+05
Dimensions without coordinates: nv
Data variables:
    dayl                     (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
    lambert_conformal_conic  int16 ...
    prcp                     (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
    srad                     (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
    swe                      (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
    time_bnds                (time, nv) datetime64[ns] dask.array<chunksize=(365, 2), meta=np.ndarray>
    tmax                     (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
    tmin                     (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
    vp                       (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
    yearday                  (time) int16 dask.array<chunksize=(365,), meta=np.ndarray>
Attributes:
    Conventions:       CF-1.6
    Version_data:      Daymet Data Version 4.0
    Version_software:  Daymet Software Version 4.0
    citation:          Please see http://daymet.ornl.gov/ for current Daymet ...
    references:        Please see http://daymet.ornl.gov/ for current informa...
    source:            Daymet Software Version 4.0
    start_year:        1980

The .compute() call in the next cell is what triggers computation and causes Dask to scale the cluster up to a dozen or so workers.

[5]:
timeseries = ds["tmin"].mean(dim=["x", "y"]).compute()

import matplotlib.pyplot as plt

fig, ax = plt.subplots(figsize=(12, 6))
timeseries.plot(ax=ax);
[6]:
cluster.close()

Customize your cluster

dask_gateway.GatewayCluster creates a cluster with some default settings, which might not be appropriate for your workload. For example, we might have a memory-intensive workload which requires more memory per CPU core. Or we might need to set environment variables on the workers.

To customize your cluster, create a Gateway object and then customize the options.

[7]:
import dask_gateway

gateway = dask_gateway.Gateway()
cluster_options = gateway.cluster_options()

Cluster options.

In a Jupyter Notebook, you can use the HTML widget to customize the options. Or using Python you can adjust the values programmatically. We’ll ask for 16GiB of memory per worker.

[8]:
cluster_options["worker_memory"] = 16

Now create your cluster. Make sure to pass the ``cluster_options`` object to ``gateway.new_cluster``.

[9]:
cluster = gateway.new_cluster(cluster_options)
client = cluster.get_client()
cluster.scale(2)

Learn more

The Dask documentation has much more information on using Dask for scalable computing. This JupyterHub deployment uses Dask Gateway to manage creating Dask clusters.