Reference

sparkmon.monitor

Monitor thread.

class sparkmon.monitor.SparkMon(application_or_spark, period=20, callbacks=None, title_prefix='')

Class to monitor a sparkmon application or a Spark Session, running in the background.

There are multiple design patterns possible for this class. One design possibility is to use 2 threads: - a daemon thread for the regular update (non-blocking the exit) - a non-daemon thread to run the callbacks (blocking the exit) Indeed, in this design, it is important to not run the callbacks in the daemon, so that a file export callbacks wouldn’t be interrupted in the middle of saving. Here are the advantage and disadvantage: + you can run the callbacks at a slower pace - it complexifies at lot and it creates a lot problem like race conditions and dead lock. (we had this design at version 0.0.4)

This is why we took the decision to use only 1 non-daemon thread and check the MainThread status, to smoothly stop the monitoring at exit.

Remark: The same ‘application’ should not be updated by something else, like other SparkMon instances, because it could create race conditions.

Parameters
  • application_or_spark (Union[sparkmon.application.Application, pyspark.sql.session.SparkSession]) –

  • period (int) –

  • callbacks (Optional[List[Callable[[...], Any]]]) –

  • title_prefix (str) –

Return type

None

callbacks_run()

Running the callbacks.

is_main_thread_alive()

Check if the main thread is alive.

Return type

bool

live_plot_notebook(n_iter=None)

Useful in the remote case only.

This might not be compatible with callbacks that are using matplotlib, because matplotlib is not thread safe, and you can get the following errors: ` python(81469,0x1106c5e00) malloc: Incorrect checksum for freed object 0x7fe18da140a8: probably modified after being freed. Corrupt value: 0x230017000b00f005 python(81469,0x1106c5e00) malloc: *** set a breakpoint in malloc_error_break to debug `

Return type

None

run()

Overrides Thread method.

Return type

None

should_stop()

Check if we should stop the thread loop.

Return type

bool

stop()

Don’t continue to run the loop, and exit safely the thread.

Return type

None

stopped()

Check if we need to stop.

Return type

bool

sparkmon.callbacks

List of default callbacks.

sparkmon.callbacks.log_stages_to_mlflow(application, directory='sparkmon')

Log tasks to mlflow.

Parameters
  • application (sparkmon.application.Application) –

  • directory (str) –

Return type

None

sparkmon.callbacks.log_tasks_to_mlflow(application, directory='sparkmon')

Log tasks to mlflow.

Parameters
  • application (sparkmon.application.Application) –

  • directory (str) –

Return type

None

sparkmon.callbacks.log_timeseries_db_to_mlflow(application, directory='sparkmon')

Log timeseries_db to mlflow.

Parameters
  • application (sparkmon.application.Application) –

  • directory (str) –

Return type

None

sparkmon.callbacks.log_to_mlflow(application, directory='sparkmon')

Log to mlflow.

Parameters
  • application (sparkmon.application.Application) –

  • directory (str) –

Return type

None

sparkmon.callbacks.plot_to_image(application, path='sparkmon.png')

Plot and save to image.

Not compatible with live_plot_notebook().

Parameters
  • application (sparkmon.application.Application) –

  • path (str) –

Return type

None

sparkmon.callbacks.plot_to_mlflow(application, directory='sparkmon')

Log image to mlflow.

Not compatible with live_plot_notebook().

Parameters
  • application (sparkmon.application.Application) –

  • directory (str) –

Return type

None

sparkmon.plotting

Plotting utilities.

sparkmon.plotting.ax_convert_size(ax)

Convert byte size for displaying tick values.

Parameters

ax (matplotlib.axes._axes.Axes) –

sparkmon.plotting.mmm_plot(ax, value, timeseries_db_df, title_prefix, pct=False, ylim=(0, None))

Helping function to plot the aggregations of a metric.

Parameters
  • ax (matplotlib.axes._axes.Axes) –

  • value (str) –

  • timeseries_db_df (pandas.core.frame.DataFrame) –

  • title_prefix (str) –

  • pct (bool) –

  • ylim (Any) –

Return type

None

sparkmon.plotting.plot_max_value(ax, s, string_rep=<function <lambda>>)

Helping function to add the maximum value on the right side of a plot.

Parameters
  • ax (matplotlib.axes._axes.Axes) –

  • s (Any) –

  • string_rep (Callable) –

Return type

None

sparkmon.plotting.plot_notebook(application)

Plot for notebook.

Parameters

application (sparkmon.application.Application) –

Return type

None

sparkmon.plotting.plot_timeseries(timeseries_db_df, title=None)

Plot timeseries DB.

Parameters
  • timeseries_db_df (pandas.core.frame.DataFrame) –

  • title (Optional[str]) –

Return type

matplotlib.figure.Figure

sparkmon.plotting.prepare_axis(ax)

Prepare subplot ax to have no whitespace before and after the plotlines.

To call before any plotting.

Parameters

ax (matplotlib.axes._axes.Axes) –

Return type

None

sparkmon.utils

Utilities functions.

sparkmon.utils.convert_size(size_bytes)

Convert bytes in human readble string.

Credit: https://stackoverflow.com/a/14822210/1876485

Return type

str

sparkmon.utils.flatten_dict(d, parent_key='', sep='_')

Flatten dict recursively.

Credit: https://stackoverflow.com/a/6027615/1876485

sparkmon.utils.get_memory_process()

Get memory usage in bytes of this process running Python.

Return type

Any

sparkmon.utils.get_memory_user()

Get memory usage in bytes of all process of the current user.

Return type

Any

sparkmon.__main__

Command-line interface.