Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Revert "Revert "Add more observability to Ray Data operator metrics (#14)""" #28

Draft
wants to merge 1 commit into
base: pinterest/main-2.10.0
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 1 addition & 29 deletions dashboard/modules/metrics/dashboards/data_dashboard_panels.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,34 +388,6 @@
fill=0,
stack=True,
),
Panel(
id=38,
title="In-Task Backpressure Time",
description="Time spent within a running task in backpressure.",
unit="seconds",
targets=[
Target(
expr="sum(ray_data_in_task_backpressure_time{{{global_filters}}}) by (dataset, operator)",
legend="In-Task Backpressure Time: {{dataset}}, {{operator}}",
)
],
fill=0,
stack=True,
),
Panel(
id=39,
title="Task CPU Time",
description="Time spent using CPU within a running task.",
unit="seconds",
targets=[
Target(
expr="sum(ray_data_task_cpu_time{{{global_filters}}}) by (dataset, operator)",
legend="Task CPU Time: {{dataset}}, {{operator}}",
)
],
fill=0,
stack=True,
),
# Ray Data Metrics (Object Store Memory)
Panel(
id=13,
Expand Down Expand Up @@ -576,4 +548,4 @@
panels=DATA_GRAFANA_PANELS,
standard_global_filters=['dataset=~"$DatasetID"', 'SessionName=~"$SessionName"'],
base_json_file_name="data_grafana_dashboard_base.json",
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -208,22 +208,6 @@ class OpRuntimeMetrics:
"metrics_group": "tasks",
},
)
task_cpu_time: float = field(
default=0,
metadata={
"description": "Time actively using CPU within tasks",
"metrics_group": "tasks",
"map_only": True,
},
)
in_task_backpressure_time: float = field(
default=0,
metadata={
"description": "Time spent waiting idly on generator outputs to be yielded within tasks",
"metrics_group": "tasks",
"map_only": True,
},
)

# === Object store memory metrics ===
obj_store_mem_internal_inqueue_blocks: int = field(
Expand Down Expand Up @@ -289,6 +273,7 @@ class OpRuntimeMetrics:
"metrics_group": "object_store_memory",
},
)

# === Miscellaneous metrics ===
# Use "metrics_group: "misc" in the metadata for new metrics in this section.

Expand Down Expand Up @@ -500,9 +485,6 @@ def on_task_output_generated(self, task_index: int, output: RefBundle):
for block_ref, meta in output.blocks:
assert meta.exec_stats and meta.exec_stats.wall_time_s
self.block_generation_time += meta.exec_stats.wall_time_s
if meta.exec_stats.backpressure_time:
self.in_task_backpressure_time += meta.exec_stats.backpressure_time
self.task_cpu_time += meta.exec_stats.cpu_time_s
assert meta.num_rows is not None
self.rows_task_outputs_generated += meta.num_rows
trace_allocation(block_ref, "operator_output")
Expand Down
3 changes: 0 additions & 3 deletions python/ray/data/_internal/execution/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from typing import Any, Callable, Deque, Dict, Iterator, List, Optional, Set, Union

import ray
import time
from ray import ObjectRef
from ray._raylet import ObjectRefGenerator
from ray.data._internal.compute import (
Expand Down Expand Up @@ -423,11 +422,9 @@ def _map_task(
m_out.exec_stats = stats.build()
m_out.exec_stats.udf_time_s = map_transformer.udf_time()
m_out.exec_stats.task_idx = ctx.task_idx
finish_time = time.perf_counter()
yield b_out
yield m_out
stats = BlockExecStats.builder()
stats.prev_map_task_finish_time = finish_time


class _BlockRefBundler:
Expand Down
12 changes: 10 additions & 2 deletions python/ray/data/_internal/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def __init__(self, max_stats=1000):
# Object store memory-related metrics
self.execution_metrics_obj_store_memory = (
self._create_prometheus_metrics_for_execution_metrics(
metrics_group="object_store_memory",
metrics_group="obj_store_memory",
tag_keys=op_tags_keys,
)
)
Expand Down Expand Up @@ -280,7 +280,7 @@ def __init__(self, max_stats=1000):
)
self.iter_next_batch_s = Gauge(
"data_iter_next_batch_seconds",
description="Seconds spent getting next batch",
description="Seconds spent getting next batch", # Need a better description for this?
tag_keys=iter_tag_keys,
)
self.iter_format_batch_s = Gauge(
Expand Down Expand Up @@ -309,6 +309,7 @@ def __init__(self, max_stats=1000):
description="Number of blocks in remote nodes",
tag_keys=iter_tag_keys,
)

self.iter_blocks_unknown = Gauge(
"data_iter_blocks_unknown",
description="Number of blocks with unknown location",
Expand All @@ -321,6 +322,13 @@ def __init__(self, max_stats=1000):
tag_keys=iter_tag_keys,
)


self.streaming_exec_schedule_s = Gauge(
"data_streaming_exec_schedule_seconds",
description="Seconds spent streaming executor scheduling",
tag_keys=iter_tag_keys,
)

def _create_prometheus_metrics_for_execution_metrics(
self, metrics_group: str, tag_keys: Tuple[str, ...]
) -> Dict[str, Gauge]:
Expand Down
4 changes: 0 additions & 4 deletions python/ray/data/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ def __init__(self):
self.wall_time_s: Optional[float] = None
self.udf_time_s: Optional[float] = 0
self.cpu_time_s: Optional[float] = None
self.backpressure_time: Optional[float] = None
self.node_id = ray.runtime_context.get_runtime_context().get_node_id()
# Max memory usage. May be an overestimate since we do not
# differentiate from previous tasks on the same worker.
Expand Down Expand Up @@ -167,7 +166,6 @@ class _BlockExecStatsBuilder:
def __init__(self):
self.start_time = time.perf_counter()
self.start_cpu = time.process_time()
self.prev_map_task_finish_time = None

def build(self) -> "BlockExecStats":
self.end_time = time.perf_counter()
Expand All @@ -178,8 +176,6 @@ def build(self) -> "BlockExecStats":
stats.end_time_s = self.end_time
stats.wall_time_s = self.end_time - self.start_time
stats.cpu_time_s = self.end_cpu - self.start_cpu
if self.prev_map_task_finish_time:
stats.backpressure_time = self.start_time - self.prev_map_task_finish_time
if resource is None:
# NOTE(swang): resource package is not supported on Windows. This
# is only the memory usage at the end of the task, not the peak
Expand Down
Loading