Open in Colab: https://colab.research.google.com/github/casangi/graph_viper/blob/master/docs/graph_building_tutorial.ipynb
GraphVIPER Tutorial
This tutorial provides examples of how GraphVIPER
can be used to build Dask graphs by mapping a dictionary-based container of xarray.Datasets to Dask graph nodes, followed by a reduction step. The dictionary of xarray.Datasets used in this tutorial is referred to as a Processing
Set, although any dictionary containing xarray.Datasets can be used. Using the GraphVIPER
map and reduce
functions can be thought of as a generalization of xarray.map_blocks that can be applied to more than one xarray.Dataset. Both map and
reduce build Dask graphs using dask.delayed.
The following types of mapping are supported:
Partitions defined by any combination of the coordinates in the Processing Set.
More than one xarray.Dataset can be assigned to a single mapping node.
xarray.Dataset partitions assigned to different nodes can have coordinates that overlap.
The tutorial will cover the following examples:
Frequency Map Reduce: This example explains the concepts of
parallel_coords
andnode_task_data_mapping
that define parallelism.Overlapping Frequency Map Reduce.
Baseline and Frequency Map Reduce.
Time Map Reduce.
GraphVIPER
provides improvements over the CNGI prototype:
There is a clear separation between the concurrency layer (GraphVIPER) and the domain layer (science code, AstroVIPER).
The memory backpressure issue was solved by incorporating the loading of data into the compute nodes. An example of the memory backpressure issue is cube imaging where large in-memory image cubes have to be created, which Dask is not aware of, causing Dask to be overeager in loading data from disk into memory. In the future, Dask might provide an alternative solution where graph nodes can be annotated with expected memory usage.
The number of graph nodes has been minimized; this was also solved by incorporating the loading of data into the compute nodes. When Xarray backed Dask datasets are used, a node is created for each data variable, and since Radio Astronomy datasets have numerous data variables, it led to a bloated graph that impacted scaling performance.
Multiple xarray.Datasets can be processed together with overlap. This cannot be done with the current Xarray functionality, such as xarray.map_blocks.
Using a Dask plugin, the Dask Scheduler has been modified so that data can be cached to a local disk when multiple passes over larger-than-memory data have to be done. This reduces clustered file system or binary object store access (see GraphVIPER Client).
Install GraphVIPER
[1]:
import os
from importlib.metadata import version
try:
import graphviper
print("GraphVIPER version", version("graphviper"), "already installed.")
except ImportError as e:
print(e)
print("Installing GraphVIPER")
os.system("pip install graphviper")
import graphviper
print("GraphVIPER version", version("graphviper"), " installed.")
GraphVIPER version 0.0.6 already installed.
Setup Dask Cluster
To simplify things we are going to start of by just using a single process (everything will run in serial).
[2]:
# Code to start a Dask cluster with two workers and 1 thread each.
from graphviper.dask.client import local_client
# viper_client = local_client(cores=2, memory_limit="4GB",autorestrictor=True)
viper_client = local_client(serial_execution=True)
viper_client
[2024-04-10 13:45:14,954] INFO graphviper: Checking parameter values for client.local_client
[2024-04-10 13:45:14,954] INFO graphviper: Module path: /users/jsteeb/graphviper/
[2024-04-10 13:45:14,954] INFO graphviper: Searching /users/jsteeb/graphviper/src/graphviper/config/ for configuration file, please wait ...
[2024-04-10 13:45:14,960] WARNING client: It is recommended that the local cache directory be set using the dask_local_dir parameter.
[2024-04-10 13:45:14,960] INFO client: Running client in synchronous mode.
Download and Convert Dataset
[3]:
from graphviper.utils.data import download
download(file="Antennae_North.cal.lsrk.split.ms")
from xradio.vis.convert_msv2_to_processing_set import convert_msv2_to_processing_set
# The chunksize on disk. Chunksize can be specified for any of the following dimensions :
# time, baseline_id (interferometer) / antenna_id (single dish), frequency, and polarization.
chunks_on_disk = {"frequency": 3}
infile = "Antennae_North.cal.lsrk.split.ms"
outfile = "Antennae_North.cal.lsrk.split.vis.zarr"
convert_msv2_to_processing_set(
in_file=infile,
out_file=outfile,
parallel=False,
overwrite=True,
main_chunksize=chunks_on_disk,
)
[2024-04-10 13:45:14,967] INFO client: File exists: Antennae_North.cal.lsrk.split.ms
Inspect the Processing Set
The read_processing_set is a lazy function, so no data is loaded into memory; only metadata is loaded (the load_processing_set will load everything into memory). Metadata is defined as everything that is not an xarray.DataArray. Note that a Processing
Set does not have to be used with GraphVIPER
, and any dictionary of xarray.Datasets can be used.
[4]:
import pandas as pd
pd.options.display.max_colwidth = 100
ps_store = "Antennae_North.cal.lsrk.split.vis.zarr"
from xradio.vis.read_processing_set import read_processing_set
intents = ["OBSERVE_TARGET#ON_SOURCE"]
fields = None
ps = read_processing_set(
ps_store="Antennae_North.cal.lsrk.split.vis.zarr",
intents=intents,
fields=fields,
)
display(ps.summary())
name | ddi | intent | field_id | field_name | start_frequency | end_frequency | shape | field_coords | |
---|---|---|---|---|---|---|---|---|---|
0 | Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0 | 0 | OBSERVE_TARGET#ON_SOURCE | 0 | NGC4038 - Antennae North | 3.439281e+11 | 3.440067e+11 | (45, 64, 8, 2) | [fk5, 12h01m52.43086055s, -18d52m02.92003728s] |
1 | Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1 | 0 | OBSERVE_TARGET#ON_SOURCE | 1 | NGC4038 - Antennae North | 3.439281e+11 | 3.440067e+11 | (35, 64, 8, 2) | [fk5, 12h01m52.95871137s, -18d52m02.92s] |
2 | Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2 | 0 | OBSERVE_TARGET#ON_SOURCE | 2 | NGC4038 - Antennae North | 3.439281e+11 | 3.440067e+11 | (35, 64, 8, 2) | [fk5, 12h01m53.48656218s, -18d52m02.91996277s] |
Inspect a single MS v4
The xarray.Datasets within a Processing Set are called Measurement Set v4 (MS v4).
[5]:
ms_xds = ps[
"Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0"
]
ms_xds
[5]:
<xarray.Dataset> Size: 716kB Dimensions: (time: 45, baseline_id: 64, frequency: 8, polarization: 2, uvw_label: 3) Coordinates: baseline_antenna1_id (baseline_id) int32 256B dask.array<chunksize=(64,), meta=np.ndarray> baseline_antenna2_id (baseline_id) int32 256B dask.array<chunksize=(64,), meta=np.ndarray> * baseline_id (baseline_id) int64 512B 0 1 2 3 ... 60 61 62 63 * frequency (frequency) float64 64B 3.439e+11 ... 3.44e+11 * polarization (polarization) <U2 16B 'XX' 'YY' * time (time) float64 360B 1.307e+09 ... 1.307e+09 * uvw_label (uvw_label) <U1 12B 'u' 'v' 'w' Data variables: EFFECTIVE_INTEGRATION_TIME (time, baseline_id) float64 23kB dask.array<chunksize=(45, 64), meta=np.ndarray> FLAG (time, baseline_id, frequency, polarization) bool 46kB dask.array<chunksize=(45, 64, 3, 2), meta=np.ndarray> TIME_CENTROID (time, baseline_id) float64 23kB dask.array<chunksize=(45, 64), meta=np.ndarray> UVW (time, baseline_id, uvw_label) float64 69kB dask.array<chunksize=(45, 64, 3), meta=np.ndarray> VISIBILITY (time, baseline_id, frequency, polarization) complex64 369kB dask.array<chunksize=(45, 64, 3, 2), meta=np.ndarray> WEIGHT (time, baseline_id, frequency, polarization) float32 184kB dask.array<chunksize=(45, 64, 3, 2), meta=np.ndarray> Attributes: data_groups: {'base': {'flag': 'FLAG', 'uvw': 'UVW', 'visibility': 'VISI... ddi: 0 intent: OBSERVE_TARGET#ON_SOURCE antenna_xds: <xarray.Dataset> Size: 3kB\nDimensions: (antenna_id:...
Nomenclature
input_data
: A dictionary of xarray.Datasets or a processing_set.n_datasets
: The number of xarray.Datasets in the input_data.i_dim
: The \(\text{i}^{\text{th}}\) dimension name.n_dims
: The number of dimensions over which parallelism will occur.n_dim_i_chunks
: Number of chunks into which the dimension coordinatedim_i
has been divided.n_nodes
: Number of nodes in the mapping stage of a MapReduce graph._{}
: If curly brackets are preceded by an underscore, it indicates a subscript and not a dictionary value.
How Graph Parallelism is Specified: parallel_coords
The parallel_coords
is a dictionary where the keys are dimensions over which parallelism will occur and can be any of the dimension coordinate names present in the input data. For the MS v4
xarray.Dataset, the options include time, baseline_id (interferometer) / antenna_id (single dish), frequency, and polarization. Each dimension coordinate name is associated with a dictionary that describes the data selection for
that dimension in each node of the mapping stage of the graph.
The structure of the parallel_coordinates
:
parallel_coords = {
dim_0: {
'data': 1D list/np.ndarray of Number,
'data_chunks': {
0 : 1D list/np.ndarray of Number,
⋮
n_dim_0_chunks-1 : ...,
}
'data_chunk_edges': 1D list/np.ndarray of Number,
'dims': (dim_0,),
'attrs': measure attribute,
}
⋮
dim_{n_dims-1}: ...
}
The dim_i
dictionaries keys have the following meanings:
data
: An array containing all the coordinate values associated with that dimension. These values do not necessarily have to match the values in the coordinates of the input data, as those are interpolated onto these values. The minimum and maximum values can be respectively larger or smaller than the values in the coordinates of individual xarray.Datasets; this will simply exclude that data from being processed. It’s important to note that theparallel_coords
and the input data coordinates must have the same measures attributes (reference frame, units, etc.).data_chunks
: A dictionary where the values are chunks of the data and the keys are integers. This chunking determines the parallelism of the graph. The values in the chunks can overlap.data_chunks_edges
: An array with the start and end values of each chunk.dims
: The dimension coordinate name.attrs
: TheXRADIO
measures attributes of the data (refer to XRADIO documentation).
The combinations of all the chunks in parallel_coords
determine the parallelism of the graph. For example, if you have parallel_coords
with 5 time
and 3 frequency
chunks, you would have 15-way parallelism (5x3).
This description may seem somewhat convoluted, but the following examples should help clarify things.
Frequency Map Reduce
Create Parallel Coordinates
GraphVIPER offers a convenient function, make_parallel_coord, that converts any XRADIO measures to a parallel_coord
. In this case, we will use the frequency coordinate of one of the datasets in the
processing_set. It’s worth noting that all datasets in this processing_set have the same frequency coordinates but differing time coordinates. This is the case because they represent the same spectral window but different fields in a Mosaic observation.
[6]:
from graphviper.graph_tools.coordinate_utils import make_parallel_coord
from graphviper.utils.display import dict_to_html
from IPython.display import HTML, display
parallel_coords = {}
n_chunks = 3
parallel_coords["frequency"] = make_parallel_coord(
coord=ms_xds.frequency, n_chunks=n_chunks
)
display(HTML(dict_to_html(parallel_coords["frequency"])))
data_chunks
attrs
channel_width
attrs
reference_frequency
attrs
The display of the frequency parallel_coords
clearly shows how the data was split into 3 chunks. All the chunks must have the same number of values, except the last chunk, which can have fewer. GraphVIPER
also has a convenience functions that can create frequency and
time coordinate measures:
[7]:
from graphviper.graph_tools.coordinate_utils import make_frequency_coord
n_chunks = 3
coord = make_frequency_coord(
freq_start=343928096685.9587,
freq_delta=11231488.981445312,
n_channels=8,
velocity_frame="lsrk",
)
parallel_coords["frequency"] = make_parallel_coord(
coord=coord, n_chunks=n_chunks
)
display(HTML(dict_to_html(parallel_coords["frequency"])))
data_chunks
attrs
Create Node Task Data Mapping
Now, the coordinates in the input data must be mapped onto the parallel_coords
. This is achieved using the interpolate_data_coords_onto_parallel_coords
function, which produces the node_task_data_mapping
. It is a dictionary where each key is a node id of one of the nodes in the mapping stage of the graph.
Structure of node_task_data_mapping:
node_task_data_mapping = {
0 : {
'chunk_indices': tuple of int,
'parallel_dims': (dim_0, ..., dim_{n_dims-1}),
'data_selection': {
dataset_name_0: {
dim_0: slice,
⋮
dim_(n_dims-1): slice
}
⋮
dataset_name_{n_dataset-1}: ...
}
'task_coords':
dim_0:{
'data': list/np.ndarray of Number,
'dims': str,
'attrs': measure attribute,
}
⋮
dim_(n_dims-1): ...
}
⋮
n_nodes-1 : ...
}
Each node_id
dictionary has the keys with the following meaning:
chunk_indices
: The indices assigned to the data chunks in theparallel_coords
. There must be an index for eachparallel_dims
.parallel_dims
: The dimension coordinates over which parallelism will occur.data_selection
: A dictionary where the keys are the names of the datasets in theprocessing_set
, and the values are dictionaries with the coordinates and accompanying slices. If a coordinate is not included, all values will be selected.task_coords
: The chunk of the parallel_coord that is assigned to this node.
[8]:
from graphviper.graph_tools.coordinate_utils import (
interpolate_data_coords_onto_parallel_coords,
)
node_task_data_mapping = interpolate_data_coords_onto_parallel_coords(
parallel_coords, ps
)
display(HTML(dict_to_html(node_task_data_mapping)))
0
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
frequency
attrs
1
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
frequency
attrs
2
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
frequency
attrs
Create a chunk function and map graph
The map function combines a node_task_data_mapping
and a node_task
to create the map portion of the graph. The node_task
must be a function with a single dictionary input and a single output as is the my_func
in the example below. The map
function will pass the input_params
dictionary to the node_task
and add the following items from the
node_task_data_mapping
:
chunk_indices
parallel_dims
data_selection
task_coords
task_id
If local caching is enabled the following will also be included with the input_params
dictionary:
date_time
viper_local_dir
[9]:
%load_ext autoreload
%autoreload 2
from graphviper.graph_tools.map import map
from graphviper.graph_tools.generate_dask_workflow import generate_dask_workflow
import dask
from graphviper.utils.display import dict_to_html
from IPython.display import display, HTML
def my_func(input_params):
display(HTML(dict_to_html(input_params)))
print("*" * 30)
return input_params["test_input"]
input_params = {}
input_params["test_input"] = 42
viper_graph = map(
input_data=ps,
node_task_data_mapping=node_task_data_mapping,
node_task=my_func,
input_params=input_params,
)
dask_graph = generate_dask_workflow(viper_graph)
dask.visualize(dask_graph, filename="map_graph")
[9]:
[10]:
display(HTML(dict_to_html(viper_graph)))
map
[11]:
dask_graph
[11]:
[Delayed('my_func-60247f6f-43db-4450-93c4-7573d4c4d4df'),
Delayed('my_func-07d2a127-e064-4191-a9e1-a87815b6ba2f'),
Delayed('my_func-59e3d7dc-91de-4bd5-a1df-cb12b6224a56')]
Run Map Graph
[12]:
dask.compute(dask_graph)
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
frequency
attrs
******************************
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
frequency
attrs
******************************
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
frequency
attrs
******************************
[12]:
([42, 42, 42],)
Reduce Graph
The reduce function takes the graph created by the map function and adds a reduce graph that combines the outputs using one of two methods:
single_node
: where the output from allmap
nodes is sent to a single node,tree
: where the outputs are combined using a binary tree reduction.
The function that forms the nodes in the reduce portion of the graph must have two parameters: input_data
and input_params
. The input_data
represents the output from the mapping nodes, while input_params
comes from the reduce
parameter with the same name.
[13]:
# Single Node Reduce
from graphviper.graph_tools import reduce
import numpy as np
def my_sum(graph_inputs, input_params):
print(graph_inputs)
return np.sum(graph_inputs / input_params["test_input"])
input_params = {}
input_params["test_input"] = 5
viper_graph_reduce = reduce(
viper_graph, my_sum, input_params, mode="single_node"
) # mode "tree","single_node"
print(viper_graph_reduce)
dask_graph_reduce = generate_dask_workflow(viper_graph_reduce)
dask.visualize(dask_graph_reduce)
{'map': {'node_task': <function my_func at 0x7f4d650f13a0>, 'input_params': [{'test_input': 42, 'chunk_indices': (0,), 'parallel_dims': ['frequency'], 'data_selection': {'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0': {'frequency': slice(0, 3, None)}, 'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1': {'frequency': slice(0, 3, None)}, 'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2': {'frequency': slice(0, 3, None)}}, 'task_coords': {'frequency': {'data': array([3.43928097e+11, 3.43939328e+11, 3.43950560e+11]), 'dims': 'frequency', 'attrs': {'units': 'Hz', 'type': 'spectral_coord', 'velocity_frame': 'lsrk'}}}, 'task_id': 0, 'input_data': None, 'date_time': None}, {'test_input': 42, 'chunk_indices': (1,), 'parallel_dims': ['frequency'], 'data_selection': {'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0': {'frequency': slice(3, 6, None)}, 'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1': {'frequency': slice(3, 6, None)}, 'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2': {'frequency': slice(3, 6, None)}}, 'task_coords': {'frequency': {'data': array([3.43961791e+11, 3.43973023e+11, 3.43984254e+11]), 'dims': 'frequency', 'attrs': {'units': 'Hz', 'type': 'spectral_coord', 'velocity_frame': 'lsrk'}}}, 'task_id': 1, 'input_data': None, 'date_time': None}, {'test_input': 42, 'chunk_indices': (2,), 'parallel_dims': ['frequency'], 'data_selection': {'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0': {'frequency': slice(6, 8, None)}, 'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1': {'frequency': slice(6, 8, None)}, 'Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2': {'frequency': slice(6, 8, None)}}, 'task_coords': {'frequency': {'data': array([3.43995486e+11, 3.44006717e+11]), 'dims': 'frequency', 'attrs': {'units': 'Hz', 'type': 'spectral_coord', 'velocity_frame': 'lsrk'}}}, 'task_id': 2, 'input_data': None, 'date_time': None}]}, 'reduce': {'mode': 'single_node', 'node_task': <function my_sum at 0x7f4d6448f740>, 'input_params': {'test_input': 5}}}
[13]:
[14]:
# Tree Reduce
from graphviper.graph_tools import reduce
import numpy as np
def my_sum(graph_inputs, input_params):
print(graph_inputs)
return np.sum(graph_inputs) + input_params["test_input"]
input_params = {}
input_params["test_input"] = 5
viper_graph_reduce = reduce(
viper_graph, my_sum, input_params, mode="tree"
) # mode "tree","single_node"
dask_graph_reduce = generate_dask_workflow(viper_graph_reduce)
dask.visualize(dask_graph_reduce)
[14]:
[15]:
display(HTML(dict_to_html(viper_graph)))
map
reduce
input_params
Run Map Reduce Graph
[16]:
dask.compute(dask_graph_reduce)
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
frequency
attrs
******************************
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
frequency
attrs
******************************
[42, 42]
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
frequency
attrs
******************************
[89, 42]
[16]:
(136,)
Overlapping Frequency Map Reduce
Create Parallel Coordinates
[17]:
from graphviper.utils.display import dict_to_html
import dask
dask.config.set(scheduler="synchronous")
from xradio.vis.read_processing_set import read_processing_set
from IPython.display import HTML, display
ps = read_processing_set(
ps_store="Antennae_North.cal.lsrk.split.vis.zarr",
intents=["OBSERVE_TARGET#ON_SOURCE"],
)
ms_xds = ps.get(1)
n_chunks = 3
parallel_coords = {}
freq_coord = ms_xds.frequency.to_dict()
# Here, we create overlapping data chunks. Currently, there is no convenience function available to assist with this.
freq_coord["data_chunks"] = {
0: freq_coord["data"][0:4],
1: freq_coord["data"][3:7],
2: freq_coord["data"][4:8],
}
parallel_coords["frequency"] = freq_coord
display(HTML(dict_to_html(parallel_coords["frequency"])))
attrs
channel_width
attrs
reference_frequency
attrs
coords
frequency
attrs
channel_width
attrs
reference_frequency
attrs
data_chunks
Create Node Task Data Mapping
[18]:
from graphviper.graph_tools.coordinate_utils import (
interpolate_data_coords_onto_parallel_coords,
)
node_task_data_mapping = interpolate_data_coords_onto_parallel_coords(
parallel_coords, ps
)
display(HTML(dict_to_html(node_task_data_mapping)))
0
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
frequency
attrs
channel_width
attrs
reference_frequency
attrs
1
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
frequency
attrs
channel_width
attrs
reference_frequency
attrs
2
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
frequency
attrs
channel_width
attrs
reference_frequency
attrs
Map Graph
[19]:
from graphviper.graph_tools.map import map
import dask
from IPython.display import display, HTML
from xradio.vis.read_processing_set import read_processing_set
def my_func(input_params):
display(HTML(dict_to_html(input_params)))
print("*" * 30)
return input_params["test_input"]
input_params = {}
input_params["test_input"] = 42
ps = read_processing_set(
ps_store="Antennae_North.cal.lsrk.split.vis.zarr",
intents=["OBSERVE_TARGET#ON_SOURCE"],
)
viper_graph = map(
input_data=ps,
node_task_data_mapping=node_task_data_mapping,
node_task=my_func,
input_params=input_params,
)
dask_graph = generate_dask_workflow(viper_graph)
dask.visualize(dask_graph, filename="map_graph")
[19]:
Run Map Graph
[20]:
dask.compute(dask_graph)
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
frequency
attrs
channel_width
attrs
reference_frequency
attrs
******************************
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
frequency
attrs
channel_width
attrs
reference_frequency
attrs
******************************
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
frequency
attrs
channel_width
attrs
reference_frequency
attrs
******************************
[20]:
([42, 42, 42],)
Baseline and Frequency Map Reduce
Create Parallel Coordinates
[21]:
from graphviper.utils.display import dict_to_html
from graphviper.graph_tools.coordinate_utils import make_parallel_coord
import dask
dask.config.set(scheduler="synchronous")
from xradio.vis.read_processing_set import read_processing_set
from IPython.display import HTML, display
intents = ["OBSERVE_TARGET#ON_SOURCE"]
ps = read_processing_set(
ps_store="Antennae_North.cal.lsrk.split.vis.zarr",
intents=["OBSERVE_TARGET#ON_SOURCE"],
)
ms_xds = ps.get(1)
parallel_coords = {}
import xarray as xr
import numpy as np
n_chunks = 4
parallel_coords["baseline_id"] = make_parallel_coord(
coord=ms_xds.baseline_id, n_chunks=n_chunks
)
n_chunks = 3
parallel_coords["frequency"] = make_parallel_coord(
coord=ms_xds.frequency, n_chunks=n_chunks
)
display(HTML(dict_to_html(parallel_coords)))
baseline_id
data_chunks
attrs
frequency
data_chunks
attrs
channel_width
attrs
reference_frequency
attrs
Create Node Task Data Mapping
[22]:
from graphviper.graph_tools.coordinate_utils import (
interpolate_data_coords_onto_parallel_coords,
)
node_task_data_mapping = interpolate_data_coords_onto_parallel_coords(
parallel_coords, ps
)
display(HTML(dict_to_html(node_task_data_mapping)))
0
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
baseline_id
attrs
frequency
attrs
channel_width
attrs
reference_frequency
attrs
1
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
baseline_id
attrs
frequency
attrs
channel_width
attrs
reference_frequency
attrs
2
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
baseline_id
attrs
frequency
attrs
channel_width
attrs
reference_frequency
attrs
3
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
baseline_id
attrs
frequency
attrs
channel_width
attrs
reference_frequency
attrs
4
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
baseline_id
attrs
frequency
attrs
channel_width
attrs
reference_frequency
attrs
5
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
baseline_id
attrs
frequency
attrs
channel_width
attrs
reference_frequency
attrs
6
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
baseline_id
attrs
frequency
attrs
channel_width
attrs
reference_frequency
attrs
7
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
baseline_id
attrs
frequency
attrs
channel_width
attrs
reference_frequency
attrs
8
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
baseline_id
attrs
frequency
attrs
channel_width
attrs
reference_frequency
attrs
9
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
baseline_id
attrs
frequency
attrs
channel_width
attrs
reference_frequency
attrs
10
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
baseline_id
attrs
frequency
attrs
channel_width
attrs
reference_frequency
attrs
11
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
baseline_id
attrs
frequency
attrs
channel_width
attrs
reference_frequency
attrs
Map Graph
[23]:
from graphviper.graph_tools.map import map
import dask
from IPython.display import display, HTML
def my_func(input_params):
display(HTML(dict_to_html(input_params)))
print("*" * 30)
return input_params["test_input"]
# ['test_input', 'input_data_name', 'viper_local_dir', 'date_time', 'data_sel', 'chunk_coords', 'chunk_indx', 'chunk_id', 'parallel_dims']
input_params = {}
input_params["test_input"] = 42
viper_graph = map(
input_data=ps,
node_task_data_mapping=node_task_data_mapping,
node_task=my_func,
input_params=input_params,
)
dask_graph = generate_dask_workflow(viper_graph)
dask.visualize(dask_graph, filename="map_graph")
[23]:
Run Map Graph
[24]:
dask.compute(dask_graph)
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
baseline_id
attrs
frequency
attrs
channel_width
attrs
reference_frequency
attrs
******************************
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
baseline_id
attrs
frequency
attrs
channel_width
attrs
reference_frequency
attrs
******************************
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
baseline_id
attrs
frequency
attrs
channel_width
attrs
reference_frequency
attrs
******************************
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
baseline_id
attrs
frequency
attrs
channel_width
attrs
reference_frequency
attrs
******************************
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
baseline_id
attrs
frequency
attrs
channel_width
attrs
reference_frequency
attrs
******************************
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
baseline_id
attrs
frequency
attrs
channel_width
attrs
reference_frequency
attrs
******************************
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
baseline_id
attrs
frequency
attrs
channel_width
attrs
reference_frequency
attrs
******************************
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
baseline_id
attrs
frequency
attrs
channel_width
attrs
reference_frequency
attrs
******************************
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
baseline_id
attrs
frequency
attrs
channel_width
attrs
reference_frequency
attrs
******************************
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
baseline_id
attrs
frequency
attrs
channel_width
attrs
reference_frequency
attrs
******************************
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
baseline_id
attrs
frequency
attrs
channel_width
attrs
reference_frequency
attrs
******************************
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
baseline_id
attrs
frequency
attrs
channel_width
attrs
reference_frequency
attrs
******************************
[24]:
([42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42],)
Time Map Reduce
Create Parallel Coordinates
[25]:
from graphviper.graph_tools.coordinate_utils import make_parallel_coord
from graphviper.utils.display import dict_to_html
import dask
dask.config.set(scheduler="synchronous")
from xradio.vis.read_processing_set import read_processing_set
from IPython.display import HTML, display
intents = ["OBSERVE_TARGET#ON_SOURCE"]
ps = read_processing_set(
ps_store="Antennae_North.cal.lsrk.split.vis.zarr",
intents=["OBSERVE_TARGET#ON_SOURCE"],
)
ms_xds = ps.get(1)
parallel_coords = {}
import xarray as xr
import numpy as np
t0, t1, t2 = (ps.get(1).time, ps.get(0).time, ps.get(2).time)
time_coord = xr.concat([t0, t1, t2], dim="time").sortby("time").to_dict()
n_chunks = 4
parallel_coords["time"] = make_parallel_coord(coord=time_coord, n_chunks=n_chunks)
display(HTML(dict_to_html(parallel_coords["time"])))
data_chunks
attrs
integration_time
attrs
Create Node Task Data Mapping
[26]:
from graphviper.graph_tools.coordinate_utils import (
interpolate_data_coords_onto_parallel_coords,
)
node_task_data_mapping = interpolate_data_coords_onto_parallel_coords(
parallel_coords, ps
)
display(HTML(dict_to_html(node_task_data_mapping)))
0
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
time
attrs
integration_time
attrs
1
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
time
attrs
integration_time
attrs
2
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
time
attrs
integration_time
attrs
3
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
time
attrs
integration_time
attrs
Map Graph
[27]:
from graphviper.graph_tools.map import map
import dask
from IPython.display import display, HTML
def my_func(input_params):
display(HTML(dict_to_html(input_params)))
print("*" * 30)
return input_params["test_input"]
# ['test_input', 'input_data_name', 'viper_local_dir', 'date_time', 'data_sel', 'chunk_coords', 'chunk_indx', 'chunk_id', 'parallel_dims']
input_params = {}
input_params["test_input"] = 42
viper_graph = map(
input_data=ps,
node_task_data_mapping=node_task_data_mapping,
node_task=my_func,
input_params=input_params,
)
dask_graph = generate_dask_workflow(viper_graph)
dask.visualize(dask_graph, filename="map_graph")
[27]:
Run Map Graph
[28]:
dask.compute(dask_graph)
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
time
attrs
integration_time
attrs
******************************
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
time
attrs
integration_time
attrs
******************************
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
time
attrs
integration_time
attrs
******************************
data_selection
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_0
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_1
Antennae_North.cal.lsrk.split_ddi_0_intent_OBSERVE_TARGET#ON_SOURCE_field_id_2
task_coords
time
attrs
integration_time
attrs
******************************
[28]:
([42, 42, 42, 42],)