Open in Colab: https://colab.research.google.com/github/casangi/graph_viper/blob/master/docs/graph_building_tutorial.ipynb


Open In Colab

GraphVIPER Tutorial

This tutorial provides examples of how GraphVIPER can be used to build Dask graphs by mapping a dictionary-based container of xarray.Datasets to Dask graph nodes, followed by a reduction step. The dictionary of xarray.Datasets used in this tutorial is referred to as a Processing Set, although any dictionary containing xarray.Datasets can be used. Using the GraphVIPER map and reduce functions can be thought of as a generalization of xarray.map_blocks that can be applied to more than one xarray.Dataset. Both map and reduce build Dask graphs using dask.delayed.

The following types of mapping are supported:

  • Partitions defined by any combination of the coordinates in the Processing Set.

  • More than one xarray.Dataset can be assigned to a single mapping node.

  • xarray.Dataset partitions assigned to different nodes can have coordinates that overlap.

The tutorial will cover the following examples:

  • Frequency Map Reduce: This example explains the concepts of parallel_coords and node_task_data_mapping that define parallelism.

  • Overlapping Frequency Map Reduce.

  • Baseline and Frequency Map Reduce.

  • Time Map Reduce.

GraphVIPER provides improvements over the CNGI prototype:

  • There is a clear separation between the concurrency layer (GraphVIPER) and the domain layer (science code, AstroVIPER).

  • The memory backpressure issue was solved by incorporating the loading of data into the compute nodes. An example of the memory backpressure issue is cube imaging where large in-memory image cubes have to be created, which Dask is not aware of, causing Dask to be overeager in loading data from disk into memory. In the future, Dask might provide an alternative solution where graph nodes can be annotated with expected memory usage.

  • The number of graph nodes has been minimized; this was also solved by incorporating the loading of data into the compute nodes. When Xarray backed Dask datasets are used, a node is created for each data variable, and since Radio Astronomy datasets have numerous data variables, it led to a bloated graph that impacted scaling performance.

  • Multiple xarray.Datasets can be processed together with overlap. This cannot be done with the current Xarray functionality, such as xarray.map_blocks.

  • Using a Dask plugin, the Dask Scheduler has been modified so that data can be cached to a local disk when multiple passes over larger-than-memory data have to be done. This reduces clustered file system or binary object store access (see GraphVIPER Client).

Install GraphVIPER

[1]:
import os

from importlib.metadata import version

try:
    import graphviper

    print("GraphVIPER version", version("graphviper"), "already installed.")
except ImportError as e:
    print(e)
    print("Installing GraphVIPER")

    os.system("pip install graphviper")

    import graphviper

    print("GraphVIPER version", version("graphviper"), " installed.")
GraphVIPER version 0.0.6 already installed.

Setup Dask Cluster

To simplify things we are going to start of by just using a single process (everything will run in serial).

[2]:
# Code to start a Dask cluster with two workers and 1 thread each.
from graphviper.dask.client import local_client
# viper_client = local_client(cores=2, memory_limit="4GB",autorestrictor=True)
viper_client = local_client(serial_execution=True)
viper_client
[2024-04-10 13:45:14,954]     INFO  graphviper:  Checking parameter values for client.local_client
[2024-04-10 13:45:14,954]     INFO  graphviper:  Module path: /users/jsteeb/graphviper/
[2024-04-10 13:45:14,954]     INFO  graphviper:  Searching /users/jsteeb/graphviper/src/graphviper/config/ for configuration file, please wait ...
[2024-04-10 13:45:14,960]  WARNING      client:  It is recommended that the local cache directory be set using the dask_local_dir parameter.
[2024-04-10 13:45:14,960]     INFO      client:  Running client in synchronous mode.

Download and Convert Dataset

[3]:
from graphviper.utils.data import download

download(file="Antennae_North.cal.lsrk.split.ms")

from xradio.vis.convert_msv2_to_processing_set import convert_msv2_to_processing_set

# The chunksize on disk. Chunksize can be specified for any of the following dimensions :
# time, baseline_id (interferometer) / antenna_id (single dish), frequency, and polarization.
chunks_on_disk = {"frequency": 3}
infile = "Antennae_North.cal.lsrk.split.ms"
outfile = "Antennae_North.cal.lsrk.split.vis.zarr"
convert_msv2_to_processing_set(
    in_file=infile,
    out_file=outfile,
    parallel=False,
    overwrite=True,
    main_chunksize=chunks_on_disk,
)
[2024-04-10 13:45:14,967]     INFO      client:  File exists: Antennae_North.cal.lsrk.split.ms

Inspect the Processing Set

The read_processing_set is a lazy function, so no data is loaded into memory; only metadata is loaded (the load_processing_set will load everything into memory). Metadata is defined as everything that is not an xarray.DataArray. Note that a Processing Set does not have to be used with GraphVIPER, and any dictionary of xarray.Datasets can be used.

[4]:
import pandas as pd

pd.options.display.max_colwidth = 100
ps_store = "Antennae_North.cal.lsrk.split.vis.zarr"

from xradio.vis.read_processing_set import read_processing_set

intents = ["OBSERVE_TARGET#ON_SOURCE"]
fields = None
ps = read_processing_set(
    ps_store="Antennae_North.cal.lsrk.split.vis.zarr",
    intents=intents,
    fields=fields,
)
display(ps.summary())
name ddi intent field_id field_name start_frequency end_frequency shape field_coords
0 Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0 0 OBSERVE_TARGET#ON_SOURCE 0 NGC4038 - Antennae North 3.439281e+11 3.440067e+11 (45, 64, 8, 2) [fk5, 12h01m52.43086055s, -18d52m02.92003728s]
1 Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1 0 OBSERVE_TARGET#ON_SOURCE 1 NGC4038 - Antennae North 3.439281e+11 3.440067e+11 (35, 64, 8, 2) [fk5, 12h01m52.95871137s, -18d52m02.92s]
2 Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2 0 OBSERVE_TARGET#ON_SOURCE 2 NGC4038 - Antennae North 3.439281e+11 3.440067e+11 (35, 64, 8, 2) [fk5, 12h01m53.48656218s, -18d52m02.91996277s]

Inspect a single MS v4

The xarray.Datasets within a Processing Set are called Measurement Set v4 (MS v4).

[5]:
ms_xds = ps[
    "Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0"
]
ms_xds
[5]:
<xarray.Dataset> Size: 716kB
Dimensions:                     (time: 45, baseline_id: 64, frequency: 8,
                                 polarization: 2, uvw_label: 3)
Coordinates:
    baseline_antenna1_id        (baseline_id) int32 256B dask.array<chunksize=(64,), meta=np.ndarray>
    baseline_antenna2_id        (baseline_id) int32 256B dask.array<chunksize=(64,), meta=np.ndarray>
  * baseline_id                 (baseline_id) int64 512B 0 1 2 3 ... 60 61 62 63
  * frequency                   (frequency) float64 64B 3.439e+11 ... 3.44e+11
  * polarization                (polarization) <U2 16B 'XX' 'YY'
  * time                        (time) float64 360B 1.307e+09 ... 1.307e+09
  * uvw_label                   (uvw_label) <U1 12B 'u' 'v' 'w'
Data variables:
    EFFECTIVE_INTEGRATION_TIME  (time, baseline_id) float64 23kB dask.array<chunksize=(45, 64), meta=np.ndarray>
    FLAG                        (time, baseline_id, frequency, polarization) bool 46kB dask.array<chunksize=(45, 64, 3, 2), meta=np.ndarray>
    TIME_CENTROID               (time, baseline_id) float64 23kB dask.array<chunksize=(45, 64), meta=np.ndarray>
    UVW                         (time, baseline_id, uvw_label) float64 69kB dask.array<chunksize=(45, 64, 3), meta=np.ndarray>
    VISIBILITY                  (time, baseline_id, frequency, polarization) complex64 369kB dask.array<chunksize=(45, 64, 3, 2), meta=np.ndarray>
    WEIGHT                      (time, baseline_id, frequency, polarization) float32 184kB dask.array<chunksize=(45, 64, 3, 2), meta=np.ndarray>
Attributes:
    data_groups:  {'base': {'flag': 'FLAG', 'uvw': 'UVW', 'visibility': 'VISI...
    ddi:          0
    intent:       OBSERVE_TARGET#ON_SOURCE
    antenna_xds:  <xarray.Dataset> Size: 3kB\nDimensions:        (antenna_id:...

Nomenclature

  • input_data: A dictionary of xarray.Datasets or a processing_set.

  • n_datasets: The number of xarray.Datasets in the input_data.

  • i_dim: The \(\text{i}^{\text{th}}\) dimension name.

  • n_dims: The number of dimensions over which parallelism will occur.

  • n_dim_i_chunks: Number of chunks into which the dimension coordinate dim_i has been divided.

  • n_nodes: Number of nodes in the mapping stage of a MapReduce graph.

  • _{}: If curly brackets are preceded by an underscore, it indicates a subscript and not a dictionary value.

How Graph Parallelism is Specified: parallel_coords

The parallel_coords is a dictionary where the keys are dimensions over which parallelism will occur and can be any of the dimension coordinate names present in the input data. For the MS v4 xarray.Dataset, the options include time, baseline_id (interferometer) / antenna_id (single dish), frequency, and polarization. Each dimension coordinate name is associated with a dictionary that describes the data selection for that dimension in each node of the mapping stage of the graph.

The structure of the parallel_coordinates:

parallel_coords = {
    dim_0: {
        'data': 1D list/np.ndarray of Number,
        'data_chunks': {
            0 : 1D list/np.ndarray of Number,
            ⋮
            n_dim_0_chunks-1 : ...,
        }
        'data_chunk_edges': 1D list/np.ndarray of Number,
        'dims': (dim_0,),
        'attrs': measure attribute,
    }
    ⋮
    dim_{n_dims-1}: ...
}

The dim_i dictionaries keys have the following meanings:

  • data: An array containing all the coordinate values associated with that dimension. These values do not necessarily have to match the values in the coordinates of the input data, as those are interpolated onto these values. The minimum and maximum values can be respectively larger or smaller than the values in the coordinates of individual xarray.Datasets; this will simply exclude that data from being processed. It’s important to note that the parallel_coords and the input data coordinates must have the same measures attributes (reference frame, units, etc.).

  • data_chunks: A dictionary where the values are chunks of the data and the keys are integers. This chunking determines the parallelism of the graph. The values in the chunks can overlap.

  • data_chunks_edges: An array with the start and end values of each chunk.

  • dims: The dimension coordinate name.

  • attrs: The XRADIO measures attributes of the data (refer to XRADIO documentation).

The combinations of all the chunks in parallel_coords determine the parallelism of the graph. For example, if you have parallel_coords with 5 time and 3 frequency chunks, you would have 15-way parallelism (5x3).

This description may seem somewhat convoluted, but the following examples should help clarify things.

Frequency Map Reduce

Create Parallel Coordinates

GraphVIPER offers a convenient function, make_parallel_coord, that converts any XRADIO measures to a parallel_coord. In this case, we will use the frequency coordinate of one of the datasets in the processing_set. It’s worth noting that all datasets in this processing_set have the same frequency coordinates but differing time coordinates. This is the case because they represent the same spectral window but different fields in a Mosaic observation.

[6]:
from graphviper.graph_tools.coordinate_utils import make_parallel_coord
from graphviper.utils.display import dict_to_html
from IPython.display import HTML, display

parallel_coords = {}
n_chunks = 3
parallel_coords["frequency"] = make_parallel_coord(
    coord=ms_xds.frequency, n_chunks=n_chunks
)
display(HTML(dict_to_html(parallel_coords["frequency"])))
data: [3.43928097e+11 3.43939328e+11 3.43950560e+11 3.43961791e+11 3.43973023e+11 3.43984254e+11 3.43995486e+11 3.44006717e+11]
data_chunks
0: [3.43928097e+11 3.43939328e+11 3.43950560e+11]
1: [3.43961791e+11 3.43973023e+11 3.43984254e+11]
2: [3.43995486e+11 3.44006717e+11]
data_chunks_edges: [343928096685.9587, 343950559663.9216, 343961791152.903, 343984254130.8659, 343995485619.84735, 344006717108.8288]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']

The display of the frequency parallel_coords clearly shows how the data was split into 3 chunks. All the chunks must have the same number of values, except the last chunk, which can have fewer. GraphVIPER also has a convenience functions that can create frequency and time coordinate measures:

[7]:
from graphviper.graph_tools.coordinate_utils import make_frequency_coord

n_chunks = 3

coord = make_frequency_coord(
    freq_start=343928096685.9587,
    freq_delta=11231488.981445312,
    n_channels=8,
    velocity_frame="lsrk",
)
parallel_coords["frequency"] = make_parallel_coord(
    coord=coord, n_chunks=n_chunks
)
display(HTML(dict_to_html(parallel_coords["frequency"])))
data: [3.43928097e+11 3.43939328e+11 3.43950560e+11 3.43961791e+11 3.43973023e+11 3.43984254e+11 3.43995486e+11 3.44006717e+11]
data_chunks
0: [3.43928097e+11 3.43939328e+11 3.43950560e+11]
1: [3.43961791e+11 3.43973023e+11 3.43984254e+11]
2: [3.43995486e+11 3.44006717e+11]
data_chunks_edges: [343928096685.9587, 343950559663.9216, 343961791152.903, 343984254130.8659, 343995485619.84735, 344006717108.8288]
dims: frequency
attrs
units: Hz
type: spectral_coord
velocity_frame: lsrk

Create Node Task Data Mapping

Now, the coordinates in the input data must be mapped onto the parallel_coords. This is achieved using the interpolate_data_coords_onto_parallel_coords function, which produces the node_task_data_mapping. It is a dictionary where each key is a node id of one of the nodes in the mapping stage of the graph.

Structure of node_task_data_mapping:

node_task_data_mapping = {
    0 : {
        'chunk_indices': tuple of int,
        'parallel_dims': (dim_0, ..., dim_{n_dims-1}),
        'data_selection': {
                dataset_name_0: {
                        dim_0: slice,
                        ⋮
                        dim_(n_dims-1): slice
                }
                ⋮
                dataset_name_{n_dataset-1}: ...
        }
        'task_coords':
            dim_0:{
                'data': list/np.ndarray of Number,
                'dims': str,
                'attrs': measure attribute,
            }
            ⋮
            dim_(n_dims-1): ...
        }
    ⋮
    n_nodes-1 : ...
}

Each node_id dictionary has the keys with the following meaning:

  • chunk_indices: The indices assigned to the data chunks in the parallel_coords. There must be an index for each parallel_dims.

  • parallel_dims: The dimension coordinates over which parallelism will occur.

  • data_selection: A dictionary where the keys are the names of the datasets in the processing_set, and the values are dictionaries with the coordinates and accompanying slices. If a coordinate is not included, all values will be selected.

  • task_coords: The chunk of the parallel_coord that is assigned to this node.

[8]:
from graphviper.graph_tools.coordinate_utils import (
    interpolate_data_coords_onto_parallel_coords,
)

node_task_data_mapping = interpolate_data_coords_onto_parallel_coords(
    parallel_coords, ps
)
display(HTML(dict_to_html(node_task_data_mapping)))
0
chunk_indices: (0,)
parallel_dims: ['frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
frequency: slice(0, 3, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
frequency: slice(0, 3, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
frequency: slice(0, 3, None)
task_coords
frequency
data: [3.43928097e+11 3.43939328e+11 3.43950560e+11]
dims: frequency
attrs
units: Hz
type: spectral_coord
velocity_frame: lsrk
1
chunk_indices: (1,)
parallel_dims: ['frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
frequency: slice(3, 6, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
frequency: slice(3, 6, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
frequency: slice(3, 6, None)
task_coords
frequency
data: [3.43961791e+11 3.43973023e+11 3.43984254e+11]
dims: frequency
attrs
units: Hz
type: spectral_coord
velocity_frame: lsrk
2
chunk_indices: (2,)
parallel_dims: ['frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
frequency: slice(6, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
frequency: slice(6, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
frequency: slice(6, 8, None)
task_coords
frequency
data: [3.43995486e+11 3.44006717e+11]
dims: frequency
attrs
units: Hz
type: spectral_coord
velocity_frame: lsrk

Create a chunk function and map graph

The map function combines a node_task_data_mapping and a node_task to create the map portion of the graph. The node_task must be a function with a single dictionary input and a single output as is the my_func in the example below. The map function will pass the input_params dictionary to the node_task and add the following items from the node_task_data_mapping:

  • chunk_indices

  • parallel_dims

  • data_selection

  • task_coords

  • task_id

If local caching is enabled the following will also be included with the input_params dictionary:

  • date_time

  • viper_local_dir

[9]:
%load_ext autoreload
%autoreload 2

from graphviper.graph_tools.map import map
from graphviper.graph_tools.generate_dask_workflow import generate_dask_workflow
import dask
from graphviper.utils.display import dict_to_html
from IPython.display import display, HTML


def my_func(input_params):
    display(HTML(dict_to_html(input_params)))

    print("*" * 30)
    return input_params["test_input"]


input_params = {}
input_params["test_input"] = 42

viper_graph = map(
    input_data=ps,
    node_task_data_mapping=node_task_data_mapping,
    node_task=my_func,
    input_params=input_params,
)

dask_graph = generate_dask_workflow(viper_graph)
dask.visualize(dask_graph, filename="map_graph")
[9]:
_images/graph_building_tutorial_22_0.png
[10]:
display(HTML(dict_to_html(viper_graph)))
map
node_task:
input_params: [{'test_input': 42, 'chunk_indices': (0,), 'parallel_dims': ['frequency'], 'data_selection': {'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0': {'frequency': slice(0, 3, None)}, 'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1': {'frequency': slice(0, 3, None)}, 'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2': {'frequency': slice(0, 3, None)}}, 'task_coords': {'frequency': {'data': array([3.43928097e+11, 3.43939328e+11, 3.43950560e+11]), 'dims': 'frequency', 'attrs': {'units': 'Hz', 'type': 'spectral_coord', 'velocity_frame': 'lsrk'}}}, 'task_id': 0, 'input_data': None, 'date_time': None}, {'test_input': 42, 'chunk_indices': (1,), 'parallel_dims': ['frequency'], 'data_selection': {'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0': {'frequency': slice(3, 6, None)}, 'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1': {'frequency': slice(3, 6, None)}, 'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2': {'frequency': slice(3, 6, None)}}, 'task_coords': {'frequency': {'data': array([3.43961791e+11, 3.43973023e+11, 3.43984254e+11]), 'dims': 'frequency', 'attrs': {'units': 'Hz', 'type': 'spectral_coord', 'velocity_frame': 'lsrk'}}}, 'task_id': 1, 'input_data': None, 'date_time': None}, {'test_input': 42, 'chunk_indices': (2,), 'parallel_dims': ['frequency'], 'data_selection': {'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0': {'frequency': slice(6, 8, None)}, 'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1': {'frequency': slice(6, 8, None)}, 'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2': {'frequency': slice(6, 8, None)}}, 'task_coords': {'frequency': {'data': array([3.43995486e+11, 3.44006717e+11]), 'dims': 'frequency', 'attrs': {'units': 'Hz', 'type': 'spectral_coord', 'velocity_frame': 'lsrk'}}}, 'task_id': 2, 'input_data': None, 'date_time': None}]
[11]:
dask_graph
[11]:
[Delayed('my_func-60247f6f-43db-4450-93c4-7573d4c4d4df'),
 Delayed('my_func-07d2a127-e064-4191-a9e1-a87815b6ba2f'),
 Delayed('my_func-59e3d7dc-91de-4bd5-a1df-cb12b6224a56')]

Run Map Graph

[12]:
dask.compute(dask_graph)
test_input: 42
chunk_indices: (0,)
parallel_dims: ['frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
frequency: slice(0, 3, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
frequency: slice(0, 3, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
frequency: slice(0, 3, None)
task_coords
frequency
data: [3.43928097e+11 3.43939328e+11 3.43950560e+11]
dims: frequency
attrs
units: Hz
type: spectral_coord
velocity_frame: lsrk
task_id: 0
input_data: None
date_time: None
******************************
test_input: 42
chunk_indices: (2,)
parallel_dims: ['frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
frequency: slice(6, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
frequency: slice(6, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
frequency: slice(6, 8, None)
task_coords
frequency
data: [3.43995486e+11 3.44006717e+11]
dims: frequency
attrs
units: Hz
type: spectral_coord
velocity_frame: lsrk
task_id: 2
input_data: None
date_time: None
******************************
test_input: 42
chunk_indices: (1,)
parallel_dims: ['frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
frequency: slice(3, 6, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
frequency: slice(3, 6, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
frequency: slice(3, 6, None)
task_coords
frequency
data: [3.43961791e+11 3.43973023e+11 3.43984254e+11]
dims: frequency
attrs
units: Hz
type: spectral_coord
velocity_frame: lsrk
task_id: 1
input_data: None
date_time: None
******************************
[12]:
([42, 42, 42],)

Reduce Graph

The reduce function takes the graph created by the map function and adds a reduce graph that combines the outputs using one of two methods:

  • single_node: where the output from all map nodes is sent to a single node,

  • tree: where the outputs are combined using a binary tree reduction.

The function that forms the nodes in the reduce portion of the graph must have two parameters: input_data and input_params. The input_data represents the output from the mapping nodes, while input_params comes from the reduce parameter with the same name.

[13]:
# Single Node Reduce
from graphviper.graph_tools import reduce
import numpy as np


def my_sum(graph_inputs, input_params):
    print(graph_inputs)
    return np.sum(graph_inputs / input_params["test_input"])


input_params = {}
input_params["test_input"] = 5
viper_graph_reduce = reduce(
    viper_graph, my_sum, input_params, mode="single_node"
)  # mode "tree","single_node"

print(viper_graph_reduce)

dask_graph_reduce = generate_dask_workflow(viper_graph_reduce)
dask.visualize(dask_graph_reduce)
{'map': {'node_task': <function my_func at 0x7f4d650f13a0>, 'input_params': [{'test_input': 42, 'chunk_indices': (0,), 'parallel_dims': ['frequency'], 'data_selection': {'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0': {'frequency': slice(0, 3, None)}, 'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1': {'frequency': slice(0, 3, None)}, 'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2': {'frequency': slice(0, 3, None)}}, 'task_coords': {'frequency': {'data': array([3.43928097e+11, 3.43939328e+11, 3.43950560e+11]), 'dims': 'frequency', 'attrs': {'units': 'Hz', 'type': 'spectral_coord', 'velocity_frame': 'lsrk'}}}, 'task_id': 0, 'input_data': None, 'date_time': None}, {'test_input': 42, 'chunk_indices': (1,), 'parallel_dims': ['frequency'], 'data_selection': {'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0': {'frequency': slice(3, 6, None)}, 'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1': {'frequency': slice(3, 6, None)}, 'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2': {'frequency': slice(3, 6, None)}}, 'task_coords': {'frequency': {'data': array([3.43961791e+11, 3.43973023e+11, 3.43984254e+11]), 'dims': 'frequency', 'attrs': {'units': 'Hz', 'type': 'spectral_coord', 'velocity_frame': 'lsrk'}}}, 'task_id': 1, 'input_data': None, 'date_time': None}, {'test_input': 42, 'chunk_indices': (2,), 'parallel_dims': ['frequency'], 'data_selection': {'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0': {'frequency': slice(6, 8, None)}, 'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1': {'frequency': slice(6, 8, None)}, 'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2': {'frequency': slice(6, 8, None)}}, 'task_coords': {'frequency': {'data': array([3.43995486e+11, 3.44006717e+11]), 'dims': 'frequency', 'attrs': {'units': 'Hz', 'type': 'spectral_coord', 'velocity_frame': 'lsrk'}}}, 'task_id': 2, 'input_data': None, 'date_time': None}]}, 'reduce': {'mode': 'single_node', 'node_task': <function my_sum at 0x7f4d6448f740>, 'input_params': {'test_input': 5}}}
[13]:
_images/graph_building_tutorial_28_1.png
[14]:
# Tree Reduce
from graphviper.graph_tools import reduce
import numpy as np


def my_sum(graph_inputs, input_params):
    print(graph_inputs)
    return np.sum(graph_inputs) + input_params["test_input"]


input_params = {}
input_params["test_input"] = 5
viper_graph_reduce = reduce(
    viper_graph, my_sum, input_params, mode="tree"
)  # mode "tree","single_node"

dask_graph_reduce = generate_dask_workflow(viper_graph_reduce)
dask.visualize(dask_graph_reduce)
[14]:
_images/graph_building_tutorial_29_0.png
[15]:
display(HTML(dict_to_html(viper_graph)))
map
node_task:
input_params: [{'test_input': 42, 'chunk_indices': (0,), 'parallel_dims': ['frequency'], 'data_selection': {'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0': {'frequency': slice(0, 3, None)}, 'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1': {'frequency': slice(0, 3, None)}, 'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2': {'frequency': slice(0, 3, None)}}, 'task_coords': {'frequency': {'data': array([3.43928097e+11, 3.43939328e+11, 3.43950560e+11]), 'dims': 'frequency', 'attrs': {'units': 'Hz', 'type': 'spectral_coord', 'velocity_frame': 'lsrk'}}}, 'task_id': 0, 'input_data': None, 'date_time': None}, {'test_input': 42, 'chunk_indices': (1,), 'parallel_dims': ['frequency'], 'data_selection': {'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0': {'frequency': slice(3, 6, None)}, 'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1': {'frequency': slice(3, 6, None)}, 'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2': {'frequency': slice(3, 6, None)}}, 'task_coords': {'frequency': {'data': array([3.43961791e+11, 3.43973023e+11, 3.43984254e+11]), 'dims': 'frequency', 'attrs': {'units': 'Hz', 'type': 'spectral_coord', 'velocity_frame': 'lsrk'}}}, 'task_id': 1, 'input_data': None, 'date_time': None}, {'test_input': 42, 'chunk_indices': (2,), 'parallel_dims': ['frequency'], 'data_selection': {'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0': {'frequency': slice(6, 8, None)}, 'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1': {'frequency': slice(6, 8, None)}, 'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2': {'frequency': slice(6, 8, None)}}, 'task_coords': {'frequency': {'data': array([3.43995486e+11, 3.44006717e+11]), 'dims': 'frequency', 'attrs': {'units': 'Hz', 'type': 'spectral_coord', 'velocity_frame': 'lsrk'}}}, 'task_id': 2, 'input_data': None, 'date_time': None}]
reduce
mode: tree
node_task:
input_params
test_input: 5

Run Map Reduce Graph

[16]:
dask.compute(dask_graph_reduce)
test_input: 42
chunk_indices: (1,)
parallel_dims: ['frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
frequency: slice(3, 6, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
frequency: slice(3, 6, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
frequency: slice(3, 6, None)
task_coords
frequency
data: [3.43961791e+11 3.43973023e+11 3.43984254e+11]
dims: frequency
attrs
units: Hz
type: spectral_coord
velocity_frame: lsrk
task_id: 1
input_data: None
date_time: None
******************************
test_input: 42
chunk_indices: (0,)
parallel_dims: ['frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
frequency: slice(0, 3, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
frequency: slice(0, 3, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
frequency: slice(0, 3, None)
task_coords
frequency
data: [3.43928097e+11 3.43939328e+11 3.43950560e+11]
dims: frequency
attrs
units: Hz
type: spectral_coord
velocity_frame: lsrk
task_id: 0
input_data: None
date_time: None
******************************
[42, 42]
test_input: 42
chunk_indices: (2,)
parallel_dims: ['frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
frequency: slice(6, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
frequency: slice(6, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
frequency: slice(6, 8, None)
task_coords
frequency
data: [3.43995486e+11 3.44006717e+11]
dims: frequency
attrs
units: Hz
type: spectral_coord
velocity_frame: lsrk
task_id: 2
input_data: None
date_time: None
******************************
[89, 42]
[16]:
(136,)

Overlapping Frequency Map Reduce

Create Parallel Coordinates

[17]:
from graphviper.utils.display import dict_to_html
import dask

dask.config.set(scheduler="synchronous")
from xradio.vis.read_processing_set import read_processing_set
from IPython.display import HTML, display


ps = read_processing_set(
    ps_store="Antennae_North.cal.lsrk.split.vis.zarr",
    intents=["OBSERVE_TARGET#ON_SOURCE"],
)
ms_xds = ps.get(1)
n_chunks = 3

parallel_coords = {}
freq_coord = ms_xds.frequency.to_dict()
# Here, we create overlapping data chunks. Currently, there is no convenience function available to assist with this.
freq_coord["data_chunks"] = {
    0: freq_coord["data"][0:4],
    1: freq_coord["data"][3:7],
    2: freq_coord["data"][4:8],
}
parallel_coords["frequency"] = freq_coord

display(HTML(dict_to_html(parallel_coords["frequency"])))
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
data: [343928096685.9587, 343939328174.9401, 343950559663.9216, 343961791152.903, 343973022641.88446, 343984254130.8659, 343995485619.84735, 344006717108.8288]
coords
frequency
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
data: [343928096685.9587, 343939328174.9401, 343950559663.9216, 343961791152.903, 343973022641.88446, 343984254130.8659, 343995485619.84735, 344006717108.8288]
name: frequency
data_chunks
0: [343928096685.9587, 343939328174.9401, 343950559663.9216, 343961791152.903]
1: [343961791152.903, 343973022641.88446, 343984254130.8659, 343995485619.84735]
2: [343973022641.88446, 343984254130.8659, 343995485619.84735, 344006717108.8288]

Create Node Task Data Mapping

[18]:
from graphviper.graph_tools.coordinate_utils import (
    interpolate_data_coords_onto_parallel_coords,
)

node_task_data_mapping = interpolate_data_coords_onto_parallel_coords(
    parallel_coords, ps
)
display(HTML(dict_to_html(node_task_data_mapping)))
0
chunk_indices: (0,)
parallel_dims: ['frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
frequency: slice(0, 4, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
frequency: slice(0, 4, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
frequency: slice(0, 4, None)
task_coords
frequency
data: [343928096685.9587, 343939328174.9401, 343950559663.9216, 343961791152.903]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
1
chunk_indices: (1,)
parallel_dims: ['frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
frequency: slice(3, 7, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
frequency: slice(3, 7, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
frequency: slice(3, 7, None)
task_coords
frequency
data: [343961791152.903, 343973022641.88446, 343984254130.8659, 343995485619.84735]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
2
chunk_indices: (2,)
parallel_dims: ['frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
frequency: slice(4, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
frequency: slice(4, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
frequency: slice(4, 8, None)
task_coords
frequency
data: [343973022641.88446, 343984254130.8659, 343995485619.84735, 344006717108.8288]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']

Map Graph

[19]:
from graphviper.graph_tools.map import map
import dask
from IPython.display import display, HTML
from xradio.vis.read_processing_set import read_processing_set


def my_func(input_params):
    display(HTML(dict_to_html(input_params)))

    print("*" * 30)
    return input_params["test_input"]


input_params = {}
input_params["test_input"] = 42

ps = read_processing_set(
    ps_store="Antennae_North.cal.lsrk.split.vis.zarr",
    intents=["OBSERVE_TARGET#ON_SOURCE"],
)

viper_graph = map(
    input_data=ps,
    node_task_data_mapping=node_task_data_mapping,
    node_task=my_func,
    input_params=input_params,
)

dask_graph = generate_dask_workflow(viper_graph)

dask.visualize(dask_graph, filename="map_graph")
[19]:
_images/graph_building_tutorial_39_0.png

Run Map Graph

[20]:
dask.compute(dask_graph)
test_input: 42
chunk_indices: (0,)
parallel_dims: ['frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
frequency: slice(0, 4, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
frequency: slice(0, 4, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
frequency: slice(0, 4, None)
task_coords
frequency
data: [343928096685.9587, 343939328174.9401, 343950559663.9216, 343961791152.903]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
task_id: 0
input_data: None
date_time: None
******************************
test_input: 42
chunk_indices: (1,)
parallel_dims: ['frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
frequency: slice(3, 7, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
frequency: slice(3, 7, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
frequency: slice(3, 7, None)
task_coords
frequency
data: [343961791152.903, 343973022641.88446, 343984254130.8659, 343995485619.84735]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
task_id: 1
input_data: None
date_time: None
******************************
test_input: 42
chunk_indices: (2,)
parallel_dims: ['frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
frequency: slice(4, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
frequency: slice(4, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
frequency: slice(4, 8, None)
task_coords
frequency
data: [343973022641.88446, 343984254130.8659, 343995485619.84735, 344006717108.8288]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
task_id: 2
input_data: None
date_time: None
******************************
[20]:
([42, 42, 42],)

Baseline and Frequency Map Reduce

Create Parallel Coordinates

[21]:
from graphviper.utils.display import dict_to_html
from graphviper.graph_tools.coordinate_utils import make_parallel_coord
import dask

dask.config.set(scheduler="synchronous")

from xradio.vis.read_processing_set import read_processing_set

from IPython.display import HTML, display

intents = ["OBSERVE_TARGET#ON_SOURCE"]
ps = read_processing_set(
    ps_store="Antennae_North.cal.lsrk.split.vis.zarr",
    intents=["OBSERVE_TARGET#ON_SOURCE"],
)
ms_xds = ps.get(1)

parallel_coords = {}

import xarray as xr
import numpy as np

n_chunks = 4
parallel_coords["baseline_id"] = make_parallel_coord(
    coord=ms_xds.baseline_id, n_chunks=n_chunks
)

n_chunks = 3
parallel_coords["frequency"] = make_parallel_coord(
    coord=ms_xds.frequency, n_chunks=n_chunks
)

display(HTML(dict_to_html(parallel_coords)))
baseline_id
data: [ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63]
data_chunks
0: [ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15]
1: [16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31]
2: [32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47]
3: [48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63]
data_chunks_edges: [0, 15, 16, 31, 32, 47, 48, 63]
dims: ('baseline_id',)
attrs
frequency
data: [3.43928097e+11 3.43939328e+11 3.43950560e+11 3.43961791e+11 3.43973023e+11 3.43984254e+11 3.43995486e+11 3.44006717e+11]
data_chunks
0: [3.43928097e+11 3.43939328e+11 3.43950560e+11]
1: [3.43961791e+11 3.43973023e+11 3.43984254e+11]
2: [3.43995486e+11 3.44006717e+11]
data_chunks_edges: [343928096685.9587, 343950559663.9216, 343961791152.903, 343984254130.8659, 343995485619.84735, 344006717108.8288]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']

Create Node Task Data Mapping

[22]:
from graphviper.graph_tools.coordinate_utils import (
    interpolate_data_coords_onto_parallel_coords,
)

node_task_data_mapping = interpolate_data_coords_onto_parallel_coords(
    parallel_coords, ps
)
display(HTML(dict_to_html(node_task_data_mapping)))
0
chunk_indices: (0, 0)
parallel_dims: ['baseline_id', 'frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
baseline_id: slice(0, 16, None)
frequency: slice(0, 3, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
baseline_id: slice(0, 16, None)
frequency: slice(0, 3, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
baseline_id: slice(0, 16, None)
frequency: slice(0, 3, None)
task_coords
baseline_id
data: [ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15]
dims: ('baseline_id',)
attrs
frequency
data: [3.43928097e+11 3.43939328e+11 3.43950560e+11]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
1
chunk_indices: (0, 1)
parallel_dims: ['baseline_id', 'frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
baseline_id: slice(0, 16, None)
frequency: slice(3, 6, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
baseline_id: slice(0, 16, None)
frequency: slice(3, 6, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
baseline_id: slice(0, 16, None)
frequency: slice(3, 6, None)
task_coords
baseline_id
data: [ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15]
dims: ('baseline_id',)
attrs
frequency
data: [3.43961791e+11 3.43973023e+11 3.43984254e+11]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
2
chunk_indices: (0, 2)
parallel_dims: ['baseline_id', 'frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
baseline_id: slice(0, 16, None)
frequency: slice(6, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
baseline_id: slice(0, 16, None)
frequency: slice(6, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
baseline_id: slice(0, 16, None)
frequency: slice(6, 8, None)
task_coords
baseline_id
data: [ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15]
dims: ('baseline_id',)
attrs
frequency
data: [3.43995486e+11 3.44006717e+11]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
3
chunk_indices: (1, 0)
parallel_dims: ['baseline_id', 'frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
baseline_id: slice(16, 32, None)
frequency: slice(0, 3, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
baseline_id: slice(16, 32, None)
frequency: slice(0, 3, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
baseline_id: slice(16, 32, None)
frequency: slice(0, 3, None)
task_coords
baseline_id
data: [16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31]
dims: ('baseline_id',)
attrs
frequency
data: [3.43928097e+11 3.43939328e+11 3.43950560e+11]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
4
chunk_indices: (1, 1)
parallel_dims: ['baseline_id', 'frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
baseline_id: slice(16, 32, None)
frequency: slice(3, 6, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
baseline_id: slice(16, 32, None)
frequency: slice(3, 6, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
baseline_id: slice(16, 32, None)
frequency: slice(3, 6, None)
task_coords
baseline_id
data: [16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31]
dims: ('baseline_id',)
attrs
frequency
data: [3.43961791e+11 3.43973023e+11 3.43984254e+11]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
5
chunk_indices: (1, 2)
parallel_dims: ['baseline_id', 'frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
baseline_id: slice(16, 32, None)
frequency: slice(6, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
baseline_id: slice(16, 32, None)
frequency: slice(6, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
baseline_id: slice(16, 32, None)
frequency: slice(6, 8, None)
task_coords
baseline_id
data: [16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31]
dims: ('baseline_id',)
attrs
frequency
data: [3.43995486e+11 3.44006717e+11]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
6
chunk_indices: (2, 0)
parallel_dims: ['baseline_id', 'frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
baseline_id: slice(32, 48, None)
frequency: slice(0, 3, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
baseline_id: slice(32, 48, None)
frequency: slice(0, 3, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
baseline_id: slice(32, 48, None)
frequency: slice(0, 3, None)
task_coords
baseline_id
data: [32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47]
dims: ('baseline_id',)
attrs
frequency
data: [3.43928097e+11 3.43939328e+11 3.43950560e+11]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
7
chunk_indices: (2, 1)
parallel_dims: ['baseline_id', 'frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
baseline_id: slice(32, 48, None)
frequency: slice(3, 6, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
baseline_id: slice(32, 48, None)
frequency: slice(3, 6, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
baseline_id: slice(32, 48, None)
frequency: slice(3, 6, None)
task_coords
baseline_id
data: [32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47]
dims: ('baseline_id',)
attrs
frequency
data: [3.43961791e+11 3.43973023e+11 3.43984254e+11]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
8
chunk_indices: (2, 2)
parallel_dims: ['baseline_id', 'frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
baseline_id: slice(32, 48, None)
frequency: slice(6, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
baseline_id: slice(32, 48, None)
frequency: slice(6, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
baseline_id: slice(32, 48, None)
frequency: slice(6, 8, None)
task_coords
baseline_id
data: [32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47]
dims: ('baseline_id',)
attrs
frequency
data: [3.43995486e+11 3.44006717e+11]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
9
chunk_indices: (3, 0)
parallel_dims: ['baseline_id', 'frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
baseline_id: slice(48, 64, None)
frequency: slice(0, 3, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
baseline_id: slice(48, 64, None)
frequency: slice(0, 3, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
baseline_id: slice(48, 64, None)
frequency: slice(0, 3, None)
task_coords
baseline_id
data: [48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63]
dims: ('baseline_id',)
attrs
frequency
data: [3.43928097e+11 3.43939328e+11 3.43950560e+11]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
10
chunk_indices: (3, 1)
parallel_dims: ['baseline_id', 'frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
baseline_id: slice(48, 64, None)
frequency: slice(3, 6, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
baseline_id: slice(48, 64, None)
frequency: slice(3, 6, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
baseline_id: slice(48, 64, None)
frequency: slice(3, 6, None)
task_coords
baseline_id
data: [48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63]
dims: ('baseline_id',)
attrs
frequency
data: [3.43961791e+11 3.43973023e+11 3.43984254e+11]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
11
chunk_indices: (3, 2)
parallel_dims: ['baseline_id', 'frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
baseline_id: slice(48, 64, None)
frequency: slice(6, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
baseline_id: slice(48, 64, None)
frequency: slice(6, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
baseline_id: slice(48, 64, None)
frequency: slice(6, 8, None)
task_coords
baseline_id
data: [48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63]
dims: ('baseline_id',)
attrs
frequency
data: [3.43995486e+11 3.44006717e+11]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']

Map Graph

[23]:
from graphviper.graph_tools.map import map
import dask
from IPython.display import display, HTML


def my_func(input_params):
    display(HTML(dict_to_html(input_params)))

    print("*" * 30)
    return input_params["test_input"]


# ['test_input', 'input_data_name', 'viper_local_dir', 'date_time', 'data_sel', 'chunk_coords', 'chunk_indx', 'chunk_id', 'parallel_dims']
input_params = {}
input_params["test_input"] = 42

viper_graph = map(
    input_data=ps,
    node_task_data_mapping=node_task_data_mapping,
    node_task=my_func,
    input_params=input_params,
)

dask_graph = generate_dask_workflow(viper_graph)
dask.visualize(dask_graph, filename="map_graph")
[23]:
_images/graph_building_tutorial_48_0.png

Run Map Graph

[24]:
dask.compute(dask_graph)
test_input: 42
chunk_indices: (1, 2)
parallel_dims: ['baseline_id', 'frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
baseline_id: slice(16, 32, None)
frequency: slice(6, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
baseline_id: slice(16, 32, None)
frequency: slice(6, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
baseline_id: slice(16, 32, None)
frequency: slice(6, 8, None)
task_coords
baseline_id
data: [16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31]
dims: ('baseline_id',)
attrs
frequency
data: [3.43995486e+11 3.44006717e+11]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
task_id: 5
input_data: None
date_time: None
******************************
test_input: 42
chunk_indices: (2, 1)
parallel_dims: ['baseline_id', 'frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
baseline_id: slice(32, 48, None)
frequency: slice(3, 6, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
baseline_id: slice(32, 48, None)
frequency: slice(3, 6, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
baseline_id: slice(32, 48, None)
frequency: slice(3, 6, None)
task_coords
baseline_id
data: [32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47]
dims: ('baseline_id',)
attrs
frequency
data: [3.43961791e+11 3.43973023e+11 3.43984254e+11]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
task_id: 7
input_data: None
date_time: None
******************************
test_input: 42
chunk_indices: (3, 1)
parallel_dims: ['baseline_id', 'frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
baseline_id: slice(48, 64, None)
frequency: slice(3, 6, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
baseline_id: slice(48, 64, None)
frequency: slice(3, 6, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
baseline_id: slice(48, 64, None)
frequency: slice(3, 6, None)
task_coords
baseline_id
data: [48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63]
dims: ('baseline_id',)
attrs
frequency
data: [3.43961791e+11 3.43973023e+11 3.43984254e+11]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
task_id: 10
input_data: None
date_time: None
******************************
test_input: 42
chunk_indices: (0, 2)
parallel_dims: ['baseline_id', 'frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
baseline_id: slice(0, 16, None)
frequency: slice(6, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
baseline_id: slice(0, 16, None)
frequency: slice(6, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
baseline_id: slice(0, 16, None)
frequency: slice(6, 8, None)
task_coords
baseline_id
data: [ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15]
dims: ('baseline_id',)
attrs
frequency
data: [3.43995486e+11 3.44006717e+11]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
task_id: 2
input_data: None
date_time: None
******************************
test_input: 42
chunk_indices: (2, 2)
parallel_dims: ['baseline_id', 'frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
baseline_id: slice(32, 48, None)
frequency: slice(6, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
baseline_id: slice(32, 48, None)
frequency: slice(6, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
baseline_id: slice(32, 48, None)
frequency: slice(6, 8, None)
task_coords
baseline_id
data: [32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47]
dims: ('baseline_id',)
attrs
frequency
data: [3.43995486e+11 3.44006717e+11]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
task_id: 8
input_data: None
date_time: None
******************************
test_input: 42
chunk_indices: (3, 0)
parallel_dims: ['baseline_id', 'frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
baseline_id: slice(48, 64, None)
frequency: slice(0, 3, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
baseline_id: slice(48, 64, None)
frequency: slice(0, 3, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
baseline_id: slice(48, 64, None)
frequency: slice(0, 3, None)
task_coords
baseline_id
data: [48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63]
dims: ('baseline_id',)
attrs
frequency
data: [3.43928097e+11 3.43939328e+11 3.43950560e+11]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
task_id: 9
input_data: None
date_time: None
******************************
test_input: 42
chunk_indices: (0, 0)
parallel_dims: ['baseline_id', 'frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
baseline_id: slice(0, 16, None)
frequency: slice(0, 3, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
baseline_id: slice(0, 16, None)
frequency: slice(0, 3, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
baseline_id: slice(0, 16, None)
frequency: slice(0, 3, None)
task_coords
baseline_id
data: [ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15]
dims: ('baseline_id',)
attrs
frequency
data: [3.43928097e+11 3.43939328e+11 3.43950560e+11]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
task_id: 0
input_data: None
date_time: None
******************************
test_input: 42
chunk_indices: (2, 0)
parallel_dims: ['baseline_id', 'frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
baseline_id: slice(32, 48, None)
frequency: slice(0, 3, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
baseline_id: slice(32, 48, None)
frequency: slice(0, 3, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
baseline_id: slice(32, 48, None)
frequency: slice(0, 3, None)
task_coords
baseline_id
data: [32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47]
dims: ('baseline_id',)
attrs
frequency
data: [3.43928097e+11 3.43939328e+11 3.43950560e+11]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
task_id: 6
input_data: None
date_time: None
******************************
test_input: 42
chunk_indices: (1, 1)
parallel_dims: ['baseline_id', 'frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
baseline_id: slice(16, 32, None)
frequency: slice(3, 6, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
baseline_id: slice(16, 32, None)
frequency: slice(3, 6, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
baseline_id: slice(16, 32, None)
frequency: slice(3, 6, None)
task_coords
baseline_id
data: [16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31]
dims: ('baseline_id',)
attrs
frequency
data: [3.43961791e+11 3.43973023e+11 3.43984254e+11]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
task_id: 4
input_data: None
date_time: None
******************************
test_input: 42
chunk_indices: (0, 1)
parallel_dims: ['baseline_id', 'frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
baseline_id: slice(0, 16, None)
frequency: slice(3, 6, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
baseline_id: slice(0, 16, None)
frequency: slice(3, 6, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
baseline_id: slice(0, 16, None)
frequency: slice(3, 6, None)
task_coords
baseline_id
data: [ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15]
dims: ('baseline_id',)
attrs
frequency
data: [3.43961791e+11 3.43973023e+11 3.43984254e+11]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
task_id: 1
input_data: None
date_time: None
******************************
test_input: 42
chunk_indices: (1, 0)
parallel_dims: ['baseline_id', 'frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
baseline_id: slice(16, 32, None)
frequency: slice(0, 3, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
baseline_id: slice(16, 32, None)
frequency: slice(0, 3, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
baseline_id: slice(16, 32, None)
frequency: slice(0, 3, None)
task_coords
baseline_id
data: [16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31]
dims: ('baseline_id',)
attrs
frequency
data: [3.43928097e+11 3.43939328e+11 3.43950560e+11]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
task_id: 3
input_data: None
date_time: None
******************************
test_input: 42
chunk_indices: (3, 2)
parallel_dims: ['baseline_id', 'frequency']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
baseline_id: slice(48, 64, None)
frequency: slice(6, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
baseline_id: slice(48, 64, None)
frequency: slice(6, 8, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
baseline_id: slice(48, 64, None)
frequency: slice(6, 8, None)
task_coords
baseline_id
data: [48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63]
dims: ('baseline_id',)
attrs
frequency
data: [3.43995486e+11 3.44006717e+11]
dims: ('frequency',)
attrs
channel_width
attrs
type: quantity
units: ['Hz']
data: 11231488.981445312
dims:
frame: LSRK
reference_frequency
attrs
frame: LSRK
type: spectral_coord
units: ['Hz']
data: 343928096685.9587
dims:
spectral_window_name:
spw_id: 0
type: spectral_coord
units: ['Hz']
task_id: 11
input_data: None
date_time: None
******************************
[24]:
([42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42],)

Time Map Reduce

Create Parallel Coordinates

[25]:
from graphviper.graph_tools.coordinate_utils import make_parallel_coord
from graphviper.utils.display import dict_to_html
import dask

dask.config.set(scheduler="synchronous")

from xradio.vis.read_processing_set import read_processing_set
from IPython.display import HTML, display

intents = ["OBSERVE_TARGET#ON_SOURCE"]
ps = read_processing_set(
    ps_store="Antennae_North.cal.lsrk.split.vis.zarr",
    intents=["OBSERVE_TARGET#ON_SOURCE"],
)
ms_xds = ps.get(1)

parallel_coords = {}

import xarray as xr
import numpy as np

t0, t1, t2 = (ps.get(1).time, ps.get(0).time, ps.get(2).time)
time_coord = xr.concat([t0, t1, t2], dim="time").sortby("time").to_dict()
n_chunks = 4
parallel_coords["time"] = make_parallel_coord(coord=time_coord, n_chunks=n_chunks)
display(HTML(dict_to_html(parallel_coords["time"])))
data: [1306547230.1759996, 1306547236.224, 1306547242.2720003, 1306547248.3199997, 1306547254.368, 1306547265.4560003, 1306547271.5039997, 1306547277.552, 1306547283.5999994, 1306547289.6479998, 1306547300.736, 1306547306.7840004, 1306547312.8319998, 1306547318.88, 1306547324.9279995, 1306548534.0480003, 1306548540.0959997, 1306548546.144, 1306548552.1919994, 1306548558.2399998, 1306548569.328, 1306548575.3760004, 1306548581.4240007, 1306548587.4720001, 1306548593.5199995, 1306548604.6079998, 1306548610.6560001, 1306548616.7040005, 1306548622.7519999, 1306548628.8000002, 1306549611.8400002, 1306549617.8879995, 1306549623.9359999, 1306549629.9840002, 1306549636.0320005, 1306549647.12, 1306549653.1680002, 1306549659.2159996, 1306549665.264, 1306549671.3120003, 1306549682.4000006, 1306549688.448, 1306549694.4960003, 1306549700.5439997, 1306549706.592, 1306550670.0959997, 1306550676.144, 1306550682.1919994, 1306550688.2399998, 1306550694.288, 1307136638.6879997, 1307136644.736, 1307136650.7839994, 1307136656.8319998, 1307136662.88, 1307136673.9680004, 1307136680.0159998, 1307136686.0640001, 1307136692.1119995, 1307136698.1599998, 1307136709.2480001, 1307136715.2960005, 1307136721.3439999, 1307136727.3920002, 1307136733.4400005, 1307137970.88, 1307137976.9279995, 1307137982.9759998, 1307137989.0240002, 1307137995.0719995, 1307138006.1599998, 1307138012.2080002, 1307138018.2560005, 1307138024.304, 1307138030.3519993, 1307138041.4399996, 1307138047.488, 1307138053.5360003, 1307138059.5840006, 1307138065.632, 1307139057.12, 1307139063.1680002, 1307139069.2159996, 1307139075.264, 1307139081.3119993, 1307139092.3999996, 1307139098.448, 1307139104.4960003, 1307139110.5440006, 1307139116.592, 1307139127.6799994, 1307139133.7279997, 1307139139.776, 1307139145.8240004, 1307139151.8719997, 1307140121.7600002, 1307140127.8079996, 1307140133.856, 1307140139.9039993, 1307140145.9519997, 1307147559.9359999, 1307147565.9840002, 1307147572.0319996, 1307147578.08, 1307147584.1280003, 1307147595.2160006, 1307147601.264, 1307147607.3120003, 1307147613.3600006, 1307147619.408, 1307147630.4960003, 1307147636.5439997, 1307147642.592, 1307147648.6400003, 1307147654.6880007]
data_chunks
0: [1.30654723e+09 1.30654724e+09 1.30654724e+09 1.30654725e+09 1.30654725e+09 1.30654727e+09 1.30654727e+09 1.30654728e+09 1.30654728e+09 1.30654729e+09 1.30654730e+09 1.30654731e+09 1.30654731e+09 1.30654732e+09 1.30654732e+09 1.30654853e+09 1.30654854e+09 1.30654855e+09 1.30654855e+09 1.30654856e+09 1.30654857e+09 1.30654858e+09 1.30654858e+09 1.30654859e+09 1.30654859e+09 1.30654860e+09 1.30654861e+09 1.30654862e+09 1.30654862e+09]
1: [1.30654863e+09 1.30654961e+09 1.30654962e+09 1.30654962e+09 1.30654963e+09 1.30654964e+09 1.30654965e+09 1.30654965e+09 1.30654966e+09 1.30654967e+09 1.30654967e+09 1.30654968e+09 1.30654969e+09 1.30654969e+09 1.30654970e+09 1.30654971e+09 1.30655067e+09 1.30655068e+09 1.30655068e+09 1.30655069e+09 1.30655069e+09 1.30713664e+09 1.30713664e+09 1.30713665e+09 1.30713666e+09 1.30713666e+09 1.30713667e+09 1.30713668e+09 1.30713669e+09]
2: [1.30713669e+09 1.30713670e+09 1.30713671e+09 1.30713672e+09 1.30713672e+09 1.30713673e+09 1.30713673e+09 1.30713797e+09 1.30713798e+09 1.30713798e+09 1.30713799e+09 1.30713800e+09 1.30713801e+09 1.30713801e+09 1.30713802e+09 1.30713802e+09 1.30713803e+09 1.30713804e+09 1.30713805e+09 1.30713805e+09 1.30713806e+09 1.30713807e+09 1.30713906e+09 1.30713906e+09 1.30713907e+09 1.30713908e+09 1.30713908e+09 1.30713909e+09 1.30713910e+09]
3: [1.30713910e+09 1.30713911e+09 1.30713912e+09 1.30713913e+09 1.30713913e+09 1.30713914e+09 1.30713915e+09 1.30713915e+09 1.30714012e+09 1.30714013e+09 1.30714013e+09 1.30714014e+09 1.30714015e+09 1.30714756e+09 1.30714757e+09 1.30714757e+09 1.30714758e+09 1.30714758e+09 1.30714760e+09 1.30714760e+09 1.30714761e+09 1.30714761e+09 1.30714762e+09 1.30714763e+09 1.30714764e+09 1.30714764e+09 1.30714765e+09 1.30714765e+09]
data_chunks_edges: [1306547230.1759996, 1306548622.7519999, 1306548628.8000002, 1307136686.0640001, 1307136692.1119995, 1307139098.448, 1307139104.4960003, 1307147654.6880007]
dims: ('time',)
attrs
effective_integration_time: EFFECTIVE_INTEGRATION_TIME
format: unix
integration_time
attrs
type: quantity
units: ['s']
data: 6.048
dims:
scale: UTC
type: time
units: ['s']

Create Node Task Data Mapping

[26]:
from graphviper.graph_tools.coordinate_utils import (
    interpolate_data_coords_onto_parallel_coords,
)

node_task_data_mapping = interpolate_data_coords_onto_parallel_coords(
    parallel_coords, ps
)
display(HTML(dict_to_html(node_task_data_mapping)))
0
chunk_indices: (0,)
parallel_dims: ['time']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
time: slice(0, 10, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
time: slice(0, 10, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
time: slice(0, 9, None)
task_coords
time
data: [1.30654723e+09 1.30654724e+09 1.30654724e+09 1.30654725e+09 1.30654725e+09 1.30654727e+09 1.30654727e+09 1.30654728e+09 1.30654728e+09 1.30654729e+09 1.30654730e+09 1.30654731e+09 1.30654731e+09 1.30654732e+09 1.30654732e+09 1.30654853e+09 1.30654854e+09 1.30654855e+09 1.30654855e+09 1.30654856e+09 1.30654857e+09 1.30654858e+09 1.30654858e+09 1.30654859e+09 1.30654859e+09 1.30654860e+09 1.30654861e+09 1.30654862e+09 1.30654862e+09]
dims: ('time',)
attrs
effective_integration_time: EFFECTIVE_INTEGRATION_TIME
format: unix
integration_time
attrs
type: quantity
units: ['s']
data: 6.048
dims:
scale: UTC
type: time
units: ['s']
1
chunk_indices: (1,)
parallel_dims: ['time']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
time: slice(9, 25, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
time: slice(9, 18, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
time: slice(9, 16, None)
task_coords
time
data: [1.30654863e+09 1.30654961e+09 1.30654962e+09 1.30654962e+09 1.30654963e+09 1.30654964e+09 1.30654965e+09 1.30654965e+09 1.30654966e+09 1.30654967e+09 1.30654967e+09 1.30654968e+09 1.30654969e+09 1.30654969e+09 1.30654970e+09 1.30654971e+09 1.30655067e+09 1.30655068e+09 1.30655068e+09 1.30655069e+09 1.30655069e+09 1.30713664e+09 1.30713664e+09 1.30713665e+09 1.30713666e+09 1.30713666e+09 1.30713667e+09 1.30713668e+09 1.30713669e+09]
dims: ('time',)
attrs
effective_integration_time: EFFECTIVE_INTEGRATION_TIME
format: unix
integration_time
attrs
type: quantity
units: ['s']
data: 6.048
dims:
scale: UTC
type: time
units: ['s']
2
chunk_indices: (2,)
parallel_dims: ['time']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
time: slice(24, 35, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
time: slice(18, 27, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
time: slice(15, 26, None)
task_coords
time
data: [1.30713669e+09 1.30713670e+09 1.30713671e+09 1.30713672e+09 1.30713672e+09 1.30713673e+09 1.30713673e+09 1.30713797e+09 1.30713798e+09 1.30713798e+09 1.30713799e+09 1.30713800e+09 1.30713801e+09 1.30713801e+09 1.30713802e+09 1.30713802e+09 1.30713803e+09 1.30713804e+09 1.30713805e+09 1.30713805e+09 1.30713806e+09 1.30713807e+09 1.30713906e+09 1.30713906e+09 1.30713907e+09 1.30713908e+09 1.30713908e+09 1.30713909e+09 1.30713910e+09]
dims: ('time',)
attrs
effective_integration_time: EFFECTIVE_INTEGRATION_TIME
format: unix
integration_time
attrs
type: quantity
units: ['s']
data: 6.048
dims:
scale: UTC
type: time
units: ['s']
3
chunk_indices: (3,)
parallel_dims: ['time']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
time: slice(34, -1, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
time: slice(27, -1, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
time: slice(25, 35, None)
task_coords
time
data: [1.30713910e+09 1.30713911e+09 1.30713912e+09 1.30713913e+09 1.30713913e+09 1.30713914e+09 1.30713915e+09 1.30713915e+09 1.30714012e+09 1.30714013e+09 1.30714013e+09 1.30714014e+09 1.30714015e+09 1.30714756e+09 1.30714757e+09 1.30714757e+09 1.30714758e+09 1.30714758e+09 1.30714760e+09 1.30714760e+09 1.30714761e+09 1.30714761e+09 1.30714762e+09 1.30714763e+09 1.30714764e+09 1.30714764e+09 1.30714765e+09 1.30714765e+09]
dims: ('time',)
attrs
effective_integration_time: EFFECTIVE_INTEGRATION_TIME
format: unix
integration_time
attrs
type: quantity
units: ['s']
data: 6.048
dims:
scale: UTC
type: time
units: ['s']

Map Graph

[27]:
from graphviper.graph_tools.map import map
import dask
from IPython.display import display, HTML


def my_func(input_params):
    display(HTML(dict_to_html(input_params)))

    print("*" * 30)
    return input_params["test_input"]


# ['test_input', 'input_data_name', 'viper_local_dir', 'date_time', 'data_sel', 'chunk_coords', 'chunk_indx', 'chunk_id', 'parallel_dims']
input_params = {}
input_params["test_input"] = 42

viper_graph = map(
    input_data=ps,
    node_task_data_mapping=node_task_data_mapping,
    node_task=my_func,
    input_params=input_params,
)

dask_graph = generate_dask_workflow(viper_graph)
dask.visualize(dask_graph, filename="map_graph")
[27]:
_images/graph_building_tutorial_57_0.png

Run Map Graph

[28]:
dask.compute(dask_graph)
test_input: 42
chunk_indices: (2,)
parallel_dims: ['time']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
time: slice(24, 35, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
time: slice(18, 27, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
time: slice(15, 26, None)
task_coords
time
data: [1.30713669e+09 1.30713670e+09 1.30713671e+09 1.30713672e+09 1.30713672e+09 1.30713673e+09 1.30713673e+09 1.30713797e+09 1.30713798e+09 1.30713798e+09 1.30713799e+09 1.30713800e+09 1.30713801e+09 1.30713801e+09 1.30713802e+09 1.30713802e+09 1.30713803e+09 1.30713804e+09 1.30713805e+09 1.30713805e+09 1.30713806e+09 1.30713807e+09 1.30713906e+09 1.30713906e+09 1.30713907e+09 1.30713908e+09 1.30713908e+09 1.30713909e+09 1.30713910e+09]
dims: ('time',)
attrs
effective_integration_time: EFFECTIVE_INTEGRATION_TIME
format: unix
integration_time
attrs
type: quantity
units: ['s']
data: 6.048
dims:
scale: UTC
type: time
units: ['s']
task_id: 2
input_data: None
date_time: None
******************************
test_input: 42
chunk_indices: (3,)
parallel_dims: ['time']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
time: slice(34, -1, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
time: slice(27, -1, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
time: slice(25, 35, None)
task_coords
time
data: [1.30713910e+09 1.30713911e+09 1.30713912e+09 1.30713913e+09 1.30713913e+09 1.30713914e+09 1.30713915e+09 1.30713915e+09 1.30714012e+09 1.30714013e+09 1.30714013e+09 1.30714014e+09 1.30714015e+09 1.30714756e+09 1.30714757e+09 1.30714757e+09 1.30714758e+09 1.30714758e+09 1.30714760e+09 1.30714760e+09 1.30714761e+09 1.30714761e+09 1.30714762e+09 1.30714763e+09 1.30714764e+09 1.30714764e+09 1.30714765e+09 1.30714765e+09]
dims: ('time',)
attrs
effective_integration_time: EFFECTIVE_INTEGRATION_TIME
format: unix
integration_time
attrs
type: quantity
units: ['s']
data: 6.048
dims:
scale: UTC
type: time
units: ['s']
task_id: 3
input_data: None
date_time: None
******************************
test_input: 42
chunk_indices: (1,)
parallel_dims: ['time']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
time: slice(9, 25, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
time: slice(9, 18, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
time: slice(9, 16, None)
task_coords
time
data: [1.30654863e+09 1.30654961e+09 1.30654962e+09 1.30654962e+09 1.30654963e+09 1.30654964e+09 1.30654965e+09 1.30654965e+09 1.30654966e+09 1.30654967e+09 1.30654967e+09 1.30654968e+09 1.30654969e+09 1.30654969e+09 1.30654970e+09 1.30654971e+09 1.30655067e+09 1.30655068e+09 1.30655068e+09 1.30655069e+09 1.30655069e+09 1.30713664e+09 1.30713664e+09 1.30713665e+09 1.30713666e+09 1.30713666e+09 1.30713667e+09 1.30713668e+09 1.30713669e+09]
dims: ('time',)
attrs
effective_integration_time: EFFECTIVE_INTEGRATION_TIME
format: unix
integration_time
attrs
type: quantity
units: ['s']
data: 6.048
dims:
scale: UTC
type: time
units: ['s']
task_id: 1
input_data: None
date_time: None
******************************
test_input: 42
chunk_indices: (0,)
parallel_dims: ['time']
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
time: slice(0, 10, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
time: slice(0, 10, None)
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
time: slice(0, 9, None)
task_coords
time
data: [1.30654723e+09 1.30654724e+09 1.30654724e+09 1.30654725e+09 1.30654725e+09 1.30654727e+09 1.30654727e+09 1.30654728e+09 1.30654728e+09 1.30654729e+09 1.30654730e+09 1.30654731e+09 1.30654731e+09 1.30654732e+09 1.30654732e+09 1.30654853e+09 1.30654854e+09 1.30654855e+09 1.30654855e+09 1.30654856e+09 1.30654857e+09 1.30654858e+09 1.30654858e+09 1.30654859e+09 1.30654859e+09 1.30654860e+09 1.30654861e+09 1.30654862e+09 1.30654862e+09]
dims: ('time',)
attrs
effective_integration_time: EFFECTIVE_INTEGRATION_TIME
format: unix
integration_time
attrs
type: quantity
units: ['s']
data: 6.048
dims:
scale: UTC
type: time
units: ['s']
task_id: 0
input_data: None
date_time: None
******************************
[28]:
([42, 42, 42, 42],)