I have a parameterized input data class that outputs a loaded model into the next stage of pipeline:
class InputData(param.Parameterized):
# placeholders for the incoming parameters
load_sim_widget = param.ClassSelector(class_=LoadAdhSimulation)
att_widget = param.ClassSelector(class_=Attributes)
proj_widget = param.ClassSelector(class_=projections.Projection)
label = param.String(default='Basic Input', precedence=-1)
def __init__(self, **params):
super(InputData, self).__init__(**params)
self.dat_files = []
self.model = None
# output the adh_viz object
@param.output()
def adh_viz(self):
return self.load_data()
@param.depends('load_sim_widget.load_netcdf', 'load_sim_widget.adh_root_filename',
'load_sim_widget.adh_directory', watch=True)
def available_attributes(self):
att_list = []
# if netcdf is selected
if self.load_sim_widget.load_netcdf:
filename = os.path.join(self.load_sim_widget.adh_directory,
self.load_sim_widget.adh_root_filename + '.nc')
try:
# open the xarray dataset (does not load into memory)
ncdf = xr.open_dataset(filename)
except FileNotFoundError:
print('File Not Found: {}'.format(filename))
else:
with ncdf:
# enable and add all variables in the netcdf file
for var in ncdf.data_vars:
att_name = var.lower().replace(" ", "_")
# if the variable has results dimensions
if 'times' in ncdf[var].dims and 'nodes_ids' in ncdf[var].dims:
# add to the list
att_list.append(att_name)
# close the dataset
ncdf.close()
# otherwise read from available *.dat files
else:
# read in extension dicts & standardize extensions
ext_to_widget, widget_to_ext = attribute_dict()
# fix
ext_to_widget['error'] = ext_to_widget.pop('err_hyd')
# get the list of filenames # todo this isn't foolproof e.g. `SanDieg` finds files
glob_items = glob.glob(os.path.join(self.load_sim_widget.adh_directory,
self.load_sim_widget.adh_root_filename + '*.dat'))
# convert filenames list to attribute list
att_list = []
for filename in glob_items:
# suffix.append(get_variable_from_file_name(filename))
att_list.append(ext_to_widget[get_variable_from_file_name(filename)])
# dictionary for naming inconsistencies # todo add complexity for err_con in future
label_to_widget = {'error': 'error_hydro',
'depth-averaged_velocity': 'velocity'}
# loop over the inconsistencies
for key in label_to_widget.keys():
# if this is found in the list
if key in att_list:
# remove the key
att_list.remove(key)
# add back in the corrected value
att_list.append(label_to_widget[key])
# adjust available attribute widgets based on available data
for key, value in self.att_widget.params().items():
# if this attribute wasn't in the list
if key in att_list:
# ensure the widget is enabled
self.att_widget.params()[key].constant = False
# set the widget value
setattr(self.att_widget, key, True)
elif key != 'name':
# ensure the widget is enabled
self.att_widget.params()[key].constant = False
# uncheck the attribute
setattr(self.att_widget, key, False)
# disable attribute
self.att_widget.params()[key].constant = True
def load_data(self):
# if file is netcdf
if self.load_sim_widget.load_netcdf:
self.model, avail_attributes = load_model(self.load_sim_widget.adh_directory,
project_name=self.load_sim_widget.adh_root_filename,
netcdf=self.load_sim_widget.load_netcdf)
# request to load data from *dat files:
else:
# get a list of requested suffix strings
slist = self.att_widget.suffix_list(value=True)
# construct list of filenames
fnames = []
[fnames.append(os.path.join(self.load_sim_widget.adh_directory,
self.load_sim_widget.adh_root_filename + '_' + x + '.dat')) for x in slist]
# read the requested files
self.model, avail_attributes = load_model(self.load_sim_widget.adh_directory,
project_name=self.load_sim_widget.adh_root_filename,
netcdf=False, crs=self.proj_widget.get_crs(), filenames=fnames)
adh_viz = AdhViz()
adh_viz.set_model(self.model)
roams_model = None
return adh_viz
# visualize the page
def panel(self):
self.available_attributes() # for the initial load
return pn.Row(pn.Spacer(height=700), pn.Param(self.load_sim_widget, show_name=False),
pn.Param(self.att_widget, show_name=False), pn.Param(self.proj_widget, show_name=False),
name=self.label)
Then the next stage of the pipeline should injest the output from the first and visualize it.
class ViewResults(param.Parameterized):
load_sim_widget = param.ClassSelector(class_=LoadAdhSimulation)
att_widget = param.ClassSelector(class_=Attributes)
proj_widget = param.ClassSelector(class_=projections.Projection)
display_range = param.ClassSelector(class_=display_opts.DisplayRangeOpts)
cmap_opts = param.ClassSelector(class_=display_opts.ColormapOpts)
adh_viz = param.ClassSelector(class_=AdhViz)
wmts_widget = param.ClassSelector(class_=display_opts.WMTS)
wireframe = param.ClassSelector(class_=display_opts.ShowElements)
def __init__(self, **params):
for k, p in self.params().items():
if k in params or k == 'name':
continue
params[k] = p.class_()
super(ViewResults, self).__init__(**params)
self.annotator = ESAnnotator(path_type=gv.Path,
crs=ccrs.GOOGLE_MERCATOR,
point_columns=['depth_elevation'],
poly_columns=['name']
)
self.map_pane_ = pn.Spacer(width=0)
self.analysis_pane_ = pn.Spacer(width=0)
self.tool_pane_ = pn.Spacer(width=0)
# self.adh_viz = adh_viz
@property
def tabs(self):
result = pn.panel(self.adh_viz, parameters=['result_label'], show_name=False)
disp_tab = pn.Column(self.wmts_widget,
pn.Pane(self.cmap_opts, show_name=False),
pn.Pane(self.display_range, show_name=False),
pn.Pane(self.wireframe),
result)
return [('Display', disp_tab)]
# what to pass out of this page (for pipeline)
@param.output()
def output(self):
pass
# how to build this page
def panel(self):
return pn.panel(self.run)
@param.depends('adh_viz.result_label', 'wireframe.mesh_elements') # never need watch=True
def run(self):
# create the meshes for the dynamic map
meshes = self.adh_viz.create_animation2()
if self.wireframe.mesh_elements is True:
edgepaths_overlay = self.adh_viz.view_elements() # transparent/ invisible overlay
else:
edgepaths_overlay = hv.Points(data=[]) # existing edgepaths overlay
# Define function which applies colormap and color_range
def apply_opts(obj, colormap, color_range):
return obj.options(cmap=colormap, height=600, width=600).redim.range(**{obj.vdims[0].name: color_range}).options(
clipping_colors={'NaN': 'transparent', 'min': 'transparent'})
if meshes.label == 'scalar':
# Apply the colormap and color range dynamically
dynamic = hv.util.Dynamic(rasterize(meshes), operation=apply_opts,
streams=[Params(self.cmap_opts), Params(self.display_range)]
) * \
self.wmts_widget.view() * \
self.annotator.polys * \
self.annotator.points * \
edgepaths_overlay
elif meshes.label == 'vector':
# Apply the colormap and color range dynamically
dynamic = hv.util.Dynamic(rasterize(vectorfield_to_paths(meshes, color='Magnitude', magnitude='Magnitude',
scale=0.05), aggregator='mean', cmap=process_cmap('viridis'), precompute=True),
operation=apply_opts, streams=[Params(self.cmap_opts),
Params(self.display_range)]) * \
self.wmts_widget.view() * \
self.annotator.polys * \
self.annotator.points * \
edgepaths_overlay
time = pn.panel(self.adh_viz, parameters=['time'], widgets={'time': pn.widgets.DiscretePlayer}, show_name=False)
# time = pn.panel(self.adh_viz, parameters=['time'], show_name=False)
hv_panel = pn.panel(dynamic)
map_pane = pn.Column(hv_panel[0], pn.Row(pn.Spacer(width=100), time))
tool_pane = pn.Tabs(*self.tabs)
return pn.Row(map_pane, pn.Spacer(width=100, height=900), pn.Column(tool_pane, pn.Pane(LOGO, width=300)))
Here is the dashboard:
def results_dashboard(directory=os.path.join(ROOTDIR, 'data/SanDiego'), rootname='SanDiego'):
# generic single simulation options
load_sim_widget = LoadAdhSimulation(adh_root_filename=rootname, adh_directory=directory)
att_widget = Attributes()
# projection options
proj_widget = projections.Projection(name='')
# generic display options
wmts_widget = display_opts.WMTS(name='')
display_range = display_opts.DisplayRangeOpts()
cmap_opts = display_opts.ColormapOpts()
wireframe = display_opts.ShowElements(name='')
# create stages for pipeline
stages = [
('Input', InputData(load_sim_widget=load_sim_widget, att_widget=att_widget, proj_widget=proj_widget)),
('View', ViewResults(load_sim_widget=load_sim_widget, att_widget=att_widget, proj_widget=proj_widget,
display_range=display_range, cmap_opts=cmap_opts, wmts_widget=wmts_widget,
wireframe=wireframe))
]
# create the pipeline
pipeline = pn.pipeline.Pipeline(stages, debug=True)
# modify button width (not exposed)
pipeline.layout[0][1]._widget_box.width = 100
pipeline.layout[0][2]._widget_box.width = 100
# return a display of the pipline
return pipeline.layout
However, when I run this dashboard, the second stage is just displaying an empty adh_viz object. I have made sure that the data is loaded, but it just seems like its not passing it properly.
I had this code working properly a few days ago, but I made a minor change to load the data EARLIER in the process (should be unrelated to pipeline) and now it doesn't work.
Any ideas on what I'm missing here?
EDITED TO ADD:
When I extract the results_dashboard
code out of the class and just run it in a jupyter notebook. I get an error in the console:
Error: Model 'ClearTool' does not exist. This could be due to a widget or a custom model not being registered before first usage.
BUT, if I use the same kernel (don't restart) but rerun the imports, my pipeline will successfully pass data from one stage to the next.
Unfortunately this doesn't work if I leave the results_dashboard
code inside of the class.
EDITED AGAIN:
It WILL work with results_dashboard
class if I add panel, hv, and hv.extension('bokeh') to the imports on the notebook and reload them.
The problem was a race condition with my imports. I moved hv.extension('bokeh')
to be the last import and that fixed it.