diff --git a/dashboard/modules/metrics/dashboards/data_dashboard_panels.py b/dashboard/modules/metrics/dashboards/data_dashboard_panels.py index f91ede06e76ae..3bee67b323359 100644 --- a/dashboard/modules/metrics/dashboards/data_dashboard_panels.py +++ b/dashboard/modules/metrics/dashboards/data_dashboard_panels.py @@ -388,34 +388,6 @@ fill=0, stack=True, ), - Panel( - id=38, - title="In-Task Backpressure Time", - description="Time spent within a running task in backpressure.", - unit="seconds", - targets=[ - Target( - expr="sum(ray_data_in_task_backpressure_time{{{global_filters}}}) by (dataset, operator)", - legend="In-Task Backpressure Time: {{dataset}}, {{operator}}", - ) - ], - fill=0, - stack=True, - ), - Panel( - id=39, - title="Task CPU Time", - description="Time spent using CPU within a running task.", - unit="seconds", - targets=[ - Target( - expr="sum(ray_data_task_cpu_time{{{global_filters}}}) by (dataset, operator)", - legend="Task CPU Time: {{dataset}}, {{operator}}", - ) - ], - fill=0, - stack=True, - ), # Ray Data Metrics (Object Store Memory) Panel( id=13, @@ -576,4 +548,4 @@ panels=DATA_GRAFANA_PANELS, standard_global_filters=['dataset=~"$DatasetID"', 'SessionName=~"$SessionName"'], base_json_file_name="data_grafana_dashboard_base.json", -) \ No newline at end of file +) diff --git a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py index 26fba15685130..5ea4f4cdc71e8 100644 --- a/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py +++ b/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py @@ -208,22 +208,6 @@ class OpRuntimeMetrics: "metrics_group": "tasks", }, ) - task_cpu_time: float = field( - default=0, - metadata={ - "description": "Time actively using CPU within tasks", - "metrics_group": "tasks", - "map_only": True, - }, - ) - in_task_backpressure_time: float = field( - default=0, - metadata={ - "description": "Time spent waiting idly on generator outputs to be yielded within tasks", - "metrics_group": "tasks", - "map_only": True, - }, - ) # === Object store memory metrics === obj_store_mem_internal_inqueue_blocks: int = field( @@ -289,6 +273,7 @@ class OpRuntimeMetrics: "metrics_group": "object_store_memory", }, ) + # === Miscellaneous metrics === # Use "metrics_group: "misc" in the metadata for new metrics in this section. @@ -500,9 +485,6 @@ def on_task_output_generated(self, task_index: int, output: RefBundle): for block_ref, meta in output.blocks: assert meta.exec_stats and meta.exec_stats.wall_time_s self.block_generation_time += meta.exec_stats.wall_time_s - if meta.exec_stats.backpressure_time: - self.in_task_backpressure_time += meta.exec_stats.backpressure_time - self.task_cpu_time += meta.exec_stats.cpu_time_s assert meta.num_rows is not None self.rows_task_outputs_generated += meta.num_rows trace_allocation(block_ref, "operator_output") diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index 6f9992faf5304..069e5ff306c6e 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -6,7 +6,6 @@ from typing import Any, Callable, Deque, Dict, Iterator, List, Optional, Set, Union import ray -import time from ray import ObjectRef from ray._raylet import ObjectRefGenerator from ray.data._internal.compute import ( @@ -423,11 +422,9 @@ def _map_task( m_out.exec_stats = stats.build() m_out.exec_stats.udf_time_s = map_transformer.udf_time() m_out.exec_stats.task_idx = ctx.task_idx - finish_time = time.perf_counter() yield b_out yield m_out stats = BlockExecStats.builder() - stats.prev_map_task_finish_time = finish_time class _BlockRefBundler: diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index 0debf89338489..caa81bd6d5132 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -233,7 +233,7 @@ def __init__(self, max_stats=1000): # Object store memory-related metrics self.execution_metrics_obj_store_memory = ( self._create_prometheus_metrics_for_execution_metrics( - metrics_group="object_store_memory", + metrics_group="obj_store_memory", tag_keys=op_tags_keys, ) ) @@ -280,7 +280,7 @@ def __init__(self, max_stats=1000): ) self.iter_next_batch_s = Gauge( "data_iter_next_batch_seconds", - description="Seconds spent getting next batch", + description="Seconds spent getting next batch", # Need a better description for this? tag_keys=iter_tag_keys, ) self.iter_format_batch_s = Gauge( @@ -309,6 +309,7 @@ def __init__(self, max_stats=1000): description="Number of blocks in remote nodes", tag_keys=iter_tag_keys, ) + self.iter_blocks_unknown = Gauge( "data_iter_blocks_unknown", description="Number of blocks with unknown location", @@ -321,6 +322,13 @@ def __init__(self, max_stats=1000): tag_keys=iter_tag_keys, ) + + self.streaming_exec_schedule_s = Gauge( + "data_streaming_exec_schedule_seconds", + description="Seconds spent streaming executor scheduling", + tag_keys=iter_tag_keys, + ) + def _create_prometheus_metrics_for_execution_metrics( self, metrics_group: str, tag_keys: Tuple[str, ...] ) -> Dict[str, Gauge]: diff --git a/python/ray/data/block.py b/python/ray/data/block.py index 39c9305809719..308b2e0624040 100644 --- a/python/ray/data/block.py +++ b/python/ray/data/block.py @@ -135,7 +135,6 @@ def __init__(self): self.wall_time_s: Optional[float] = None self.udf_time_s: Optional[float] = 0 self.cpu_time_s: Optional[float] = None - self.backpressure_time: Optional[float] = None self.node_id = ray.runtime_context.get_runtime_context().get_node_id() # Max memory usage. May be an overestimate since we do not # differentiate from previous tasks on the same worker. @@ -167,7 +166,6 @@ class _BlockExecStatsBuilder: def __init__(self): self.start_time = time.perf_counter() self.start_cpu = time.process_time() - self.prev_map_task_finish_time = None def build(self) -> "BlockExecStats": self.end_time = time.perf_counter() @@ -178,8 +176,6 @@ def build(self) -> "BlockExecStats": stats.end_time_s = self.end_time stats.wall_time_s = self.end_time - self.start_time stats.cpu_time_s = self.end_cpu - self.start_cpu - if self.prev_map_task_finish_time: - stats.backpressure_time = self.start_time - self.prev_map_task_finish_time if resource is None: # NOTE(swang): resource package is not supported on Windows. This # is only the memory usage at the end of the task, not the peak