diff --git a/treccovid_semantic_search/index-1M.json b/treccovid_semantic_search/index-1M.json new file mode 100644 index 00000000..e1489ebb --- /dev/null +++ b/treccovid_semantic_search/index-1M.json @@ -0,0 +1,45 @@ +{ + "settings": { + "index.number_of_shards": {{number_of_shards | default(1)}}, + "index.number_of_replicas": {{number_of_replicas | default(0)}}, + "index.queries.cache.enabled": {{query_cache_enabled | default(false) | tojson}}, + "index.requests.cache.enable": {{requests_cache_enabled | default(false) | tojson}}, + "index.merge.policy.max_merged_segment": "100GB", + "index.knn": true, + "default_pipeline": "nlp-ingest-pipeline" + }, + "mappings": { + "dynamic": "true", + "_source": { + "enabled": {{ source_enabled | default(true) | tojson }} + }, + "properties": { + "title": { + "type": "text" + }, + "text": { + "type": "text" + }, + "metadata": { + "type": "nested", + "properties": { + "url": { + "type": "text" + }, + "pubmed_id": { + "type": "integer" + } + } + }, + "passage_embedding": { + "type": "knn_vector", + "dimension": 768, + "method": { + "name": "hnsw", + "space_type": "innerproduct", + "engine": "faiss" + } + } + } + } +} diff --git a/treccovid_semantic_search/operations/default.json b/treccovid_semantic_search/operations/default.json index ac3a65e7..0dbf85cf 100644 --- a/treccovid_semantic_search/operations/default.json +++ b/treccovid_semantic_search/operations/default.json @@ -48,6 +48,26 @@ } } }, + { + "name": "create-normalization-processor-search-pipeline", + "operation-type": "create-search-pipeline", + "id": "nlp-normalization-search-pipeline", + "body": { + "description": "Post processor for hybrid search with min_max normalization and arithmetic_mean combination", + "phase_results_processors": [ + { + "normalization-processor": { + "normalization": { + "technique": "min_max" + }, + "combination": { + "technique": "arithmetic_mean" + } + } + } + ] + } + }, { "name": "semantic-search", "operation-type": "search", @@ -69,4 +89,62 @@ } } } + }, + { + "name": "semantic-search-neural", + "operation-type": "search", + "variable-queries": {{variable_queries | default(0)}}, + "param-source": "semantic-search-neural-source", + "body": { + "_source": { + "excludes": [ + "passage_embedding" + ] + }, + "query": { + "neural": { + "passage_embedding": { + "query_text": "what types of rapid testing for Covid-19 have been developed?", + "model_id": "", + "k": {{k | default(10)}} + } + } + } + } + }, + { + "name": "hybrid-search-bm25-neural", + "operation-type": "search", + "request-params": { + "search_pipeline": "nlp-normalization-search-pipeline" + }, + "variable-queries": {{variable_queries | default(0)}}, + "param-source": "hybrid-query-bm25-neural-search-source", + "body": { + "_source": { + "excludes": [ + "passage_embedding" + ] + }, + "query": { + "hybrid": { + "queries": [ + { + "match": { + "title": "" + } + }, + { + "neural": { + "passage_embedding": { + "query_text": "what types of rapid testing for Covid-19 have been developed?", + "model_id": "", + "k": {{k | default(10)}} + } + } + } + ] + } + } + } } diff --git a/treccovid_semantic_search/params.json b/treccovid_semantic_search/params.json new file mode 100644 index 00000000..7f17ce4f --- /dev/null +++ b/treccovid_semantic_search/params.json @@ -0,0 +1,13 @@ +{ + "bulk_indexing_clients": 4, + "bulk_size": 200, + "number_of_replicas": 1, + "number_of_shards" :8, + "ingest_percentage":2, + "search_clients": 8, + "warmup_iterations": 20, + "iterations": 100, + "k": 100, + "corpus_size": "1M", + "variable_queries": 10 +} diff --git a/treccovid_semantic_search/test_procedures/default.json b/treccovid_semantic_search/test_procedures/default.json index 12c1f675..d80fb0d7 100644 --- a/treccovid_semantic_search/test_procedures/default.json +++ b/treccovid_semantic_search/test_procedures/default.json @@ -12,7 +12,6 @@ "plugins": { "ml_commons": { "only_run_on_ml_node": "false", - "native_memory_threshold": "99", "allow_registering_model_via_local_file": "true", "allow_registering_model_via_url": "true" } @@ -109,4 +108,133 @@ "clients": {{ search_clients | default(1)}} } ] + }, + { + "name": "index-merge-search-vector", + "description": "Indexes the corpus with vector embedding and then runs queries with vector embedding.", + "default": false, + "schedule": [ + { + "name": "cluster-settings", + "operation": { + "operation-type": "put-settings", + "body": { + "persistent": { + "plugins": { + "ml_commons": { + "only_run_on_ml_node": "false", + "native_memory_threshold": "99", + "allow_registering_model_via_local_file": "true", + "allow_registering_model_via_url": "true" + } + } + } + } + } + }, + { + "operation": "delete-index" + }, + { + "operation": "delete-ingest-pipeline" + }, + { + "operation": { + "operation-type": "delete-ml-model", + "model-name": "{{ model_name | default('huggingface/sentence-transformers/all-mpnet-base-v2')}}" + } + }, + { + "operation": { + "operation-type": "register-ml-model", + "model-name": "{{ model_name | default('huggingface/sentence-transformers/all-mpnet-base-v2')}}", + "model-version": "{{ model_version | default('1.0.1') }}", + "model-format": "{{ model_format | default('TORCH_SCRIPT') }}", + "model-config-file": "{{ model_config_file | default('') }}" + } + }, + { + "operation": "deploy-ml-model" + }, + { + "operation": "create-ingest-pipeline" + }, + { + "operation": { + "operation-type": "create-index", + "settings": {%- if index_settings is defined %} {{index_settings | tojson}} {%- else %} { + "index.refresh_interval": "5s", + "index.translog.flush_threshold_size": "1g" + }{%- endif %} + } + }, + { + "name": "check-cluster-health", + "operation": { + "operation-type": "cluster-health", + "index": "treccovid1m", + "request-params": { + "wait_for_status": "{{cluster_health | default('green')}}", + "wait_for_no_relocating_shards": "true" + }, + "retry-until-success": true + } + }, + { + "operation": "index-append", + "warmup-time-period": 60, + "clients": {{bulk_indexing_clients | default(1)}}, + "ignore-response-error-level": "{{error_level | default('non-fatal')}}" + }, + { + "name": "refresh-after-index", + "operation": "refresh" + }, + { + "operation": { + "operation-type": "force-merge", + "request-timeout": 7200{%- if force_merge_max_num_segments is defined %}, + "max-num-segments": {{ force_merge_max_num_segments | tojson }} + {%- endif %} + } + }, + { + "name": "refresh-after-force-merge", + "operation": "refresh" + }, + { + "operation": "wait-until-merges-finish" + } + ] + }, + { + "name": "run-search-semantic", + "description": "Runs search workload for experiment 3: neural query search", + "default": false, + "schedule": [ + { + "name": "check-cluster-health-before-index-creation", + "operation": { + "operation-type": "cluster-health", + "index": "treccovid1m", + "request-params": { + "wait_for_status": "{{cluster_health | default('green')}}", + "wait_for_no_relocating_shards": "true" + }, + "retry-until-success": true + } + }, + { + "operation": "semantic-search-neural", + "warmup-iterations": {{warmup_iterations | default(50) | tojson}}, + "iterations": {{iterations | default(100) | tojson }}, + "clients": {{ search_clients | default(1)}} + }, + { + "operation": "hybrid-search-bm25-neural", + "warmup-iterations": {{warmup_iterations | default(50) | tojson}}, + "iterations": {{iterations | default(100) | tojson }}, + "clients": {{ search_clients | default(1)}} + } + ] } diff --git a/treccovid_semantic_search/workload.json b/treccovid_semantic_search/workload.json index 761d1d0e..92cedd57 100644 --- a/treccovid_semantic_search/workload.json +++ b/treccovid_semantic_search/workload.json @@ -1,15 +1,25 @@ {% import "benchmark.helpers" as benchmark with context %} - { "version": 2, "description": "Trec-Covid is a dataset collection of documents about COVID-19 information.", "indices": [ + {% if not corpus_size %} + {% set corpus_size = '100' %} + {% endif %} + {% if corpus_size == '100' %} { "name": "treccovid", "body": "index.json" } + {% elif corpus_size == '1M' %} + { + "name": "treccovid1m", + "body": "index-1M.json" + } + {% endif %} ], "corpora": [ + {% if corpus_size == '100' %} { "name": "treccovid", "base-url": "https://dbyiw3u3rf9yr.cloudfront.net/corpora/treccovid", @@ -18,10 +28,24 @@ "source-file": "documents.json.bz2", "document-count": 129192, "compressed-bytes": 51187469, - "uncompressed-bytes": 211980208 + "uncompressed-bytes": 211980208, + "target-index": "treccovid" + } + ] + } + {% elif corpus_size == '1M' %} + { + "name": "treccovid1m", + "base-url": "https://github.com/martin-gaievski/neural-search/releases/download/trec_covid_dataset_1m_v2", + "documents": [ + { + "source-file": "documents.json.zip", + "document-count": 1027986, + "target-index": "treccovid1m" } ] } + {% endif %} ], "operations": [ {{ benchmark.collect(parts="operations/*.json") }} diff --git a/treccovid_semantic_search/workload.py b/treccovid_semantic_search/workload.py index 1eaa0436..50712aea 100644 --- a/treccovid_semantic_search/workload.py +++ b/treccovid_semantic_search/workload.py @@ -70,6 +70,117 @@ def params(self): params['body']['query']['neural']['passage_embedding']['query_text'] = query_text return params +class QueryParamSourceNeural: + def __init__(self, workload, params, **kwargs): + if len(workload.indices) == 1: + index = workload.indices[0].name + if len(workload.indices[0].types) == 1: + type = workload.indices[0].types[0].name + else: + type = None + else: + index = "_all" + type = None + + self._params = params + self._params['index'] = index + self._params['type'] = type + self._params['variable-queries'] = params.get("variable-queries", 0) + self.infinite = True + + if self._params['variable-queries'] > 0: + with open(script_dir + os.sep + 'workload_queries.json', 'r') as f: + d = json.loads(f.read()) + source_file = d['source-file'] + base_url = d['base-url'] + compressed_bytes = d['compressed-bytes'] + uncompressed_bytes = d['uncompressed-bytes'] + compressed_path = script_dir + os.sep + source_file + uncompressed_path = script_dir + os.sep + Path(source_file).stem + if not os.path.exists(compressed_path): + downloader = Downloader(False, False) + downloader.download(base_url, None, compressed_path, compressed_bytes) + if not os.path.exists(uncompressed_path): + decompressor = Decompressor() + decompressor.decompress(compressed_path, uncompressed_path, uncompressed_bytes) + + def partition(self, partition_index, total_partitions): + return self + + def params(self): + params = self._params + with open('model_id.json', 'r') as f: + d = json.loads(f.read()) + params['body']['query']['neural']['passage_embedding']['model_id'] = d['model_id'] + count = self._params.get("variable-queries", 0) + if count > 0: + script_dir = os.path.dirname(os.path.realpath(__file__)) + with open(script_dir + '/queries.json', 'r') as f: + lines = f.read().splitlines() + line =random.choice(lines) + query_text = json.loads(line)['text'] + params['body']['query']['neural']['passage_embedding']['query_text'] = query_text + + return params + +class QueryParamSourceHybridBm25Neural: + def __init__(self, workload, params, **kwargs): + if len(workload.indices) == 1: + index = workload.indices[0].name + if len(workload.indices[0].types) == 1: + type = workload.indices[0].types[0].name + else: + type = None + else: + index = "_all" + type = None + + self._params = params + self._params['index'] = index + self._params['type'] = type + self._params['variable-queries'] = params.get("variable-queries", 0) + self.infinite = True + + if self._params['variable-queries'] > 0: + with open(script_dir + os.sep + 'workload_queries.json', 'r') as f: + d = json.loads(f.read()) + source_file = d['source-file'] + base_url = d['base-url'] + compressed_bytes = d['compressed-bytes'] + uncompressed_bytes = d['uncompressed-bytes'] + compressed_path = script_dir + os.sep + source_file + uncompressed_path = script_dir + os.sep + Path(source_file).stem + if not os.path.exists(compressed_path): + downloader = Downloader(False, False) + downloader.download(base_url, None, compressed_path, compressed_bytes) + if not os.path.exists(uncompressed_path): + decompressor = Decompressor() + decompressor.decompress(compressed_path, uncompressed_path, uncompressed_bytes) + + def partition(self, partition_index, total_partitions): + return self + + def params(self): + params = self._params + count = self._params.get("variable-queries", 0) + if count > 0: + script_dir = os.path.dirname(os.path.realpath(__file__)) + model_id = '' + with open('model_id.json', 'r') as f: + d = json.loads(f.read()) + model_id = d['model_id'] + with open(script_dir + '/queries.json', 'r') as f: + lines = f.read().splitlines() + line =random.choice(lines) + query_text = json.loads(line)['text'] + match_query = random.choice(query_text.split()).lower() + params['body']['query']['hybrid']['queries'][0]['match']['title'] = match_query + params['body']['query']['hybrid']['queries'][1]['neural']['passage_embedding']['model_id'] = model_id + params['body']['query']['hybrid']['queries'][1]['neural']['passage_embedding']['query_text'] = query_text + return params + def register(registry): registry.register_param_source("semantic-search-source", QueryParamSource) + registry.register_param_source("semantic-search-neural-source", QueryParamSourceNeural) + registry.register_param_source("hybrid-query-bm25-neural-search-source", QueryParamSourceHybridBm25Neural) registry.register_param_source("create-ingest-pipeline", ingest_pipeline_param_source)