From a4c088d9664e8854c4336b7845cab5890787e354 Mon Sep 17 00:00:00 2001 From: Domenico Manna Date: Tue, 23 May 2023 17:19:18 -0400 Subject: [PATCH] Add ability to store row-level metadata This metadata field allows non API parameters to be stored which could be useful for when the results need to be associated with an external source. For example, if the user has several rows of text data within a database, and they want to calculate embeddings for each row, they can now use the metadata field to save the row id so that when they parse the results back, they know which embedding corresponds to which database row. --- examples/api_request_parallel_processor.py | 20 +++++++++++++++---- ...example_requests_to_parallel_process.jsonl | 2 +- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/examples/api_request_parallel_processor.py b/examples/api_request_parallel_processor.py index a505394..7a15615 100644 --- a/examples/api_request_parallel_processor.py +++ b/examples/api_request_parallel_processor.py @@ -31,8 +31,8 @@ python examples/api_request_parallel_processor.py \ Inputs: - requests_filepath : str - path to the file containing the requests to be processed - - file should be a jsonl file, where each line is a json object with API parameters - - e.g., {"model": "text-embedding-ada-002", "input": "embed me"} + - file should be a jsonl file, where each line is a json object with API parameters and an optional metadata field + - e.g., {"model": "text-embedding-ada-002", "input": "embed me", "metadata": {"row_id": 1}} - as with all jsonl files, take care that newlines in the content are properly escaped (json.dumps does this automatically) - an example file is provided at examples/data/example_requests_to_parallel_process.jsonl - the code to generate the example file is appended to the bottom of this script @@ -164,6 +164,7 @@ async def process_api_requests_from_file( request_json=request_json, token_consumption=num_tokens_consumed_from_request(request_json, api_endpoint, token_encoding_name), attempts_left=max_attempts, + metadata=request_json.pop("metadata", None) ) status_tracker.num_tasks_started += 1 status_tracker.num_tasks_in_progress += 1 @@ -258,6 +259,7 @@ class APIRequest: request_json: dict token_consumption: int attempts_left: int + metadata: dict result: list = field(default_factory=list) async def call_api( @@ -298,11 +300,21 @@ class APIRequest: retry_queue.put_nowait(self) else: logging.error(f"Request {self.request_json} failed after all attempts. Saving errors: {self.result}") - append_to_jsonl([self.request_json, [str(e) for e in self.result]], save_filepath) + data = ( + [self.request_json, [str(e) for e in self.result], self.metadata] + if self.metadata + else [self.request_json, [str(e) for e in self.result]] + ) + append_to_jsonl(data, save_filepath) status_tracker.num_tasks_in_progress -= 1 status_tracker.num_tasks_failed += 1 else: - append_to_jsonl([self.request_json, response], save_filepath) + data = ( + [self.request_json, response, self.metadata] + if self.metadata + else [self.request_json, response] + ) + append_to_jsonl(data, save_filepath) status_tracker.num_tasks_in_progress -= 1 status_tracker.num_tasks_succeeded += 1 logging.debug(f"Request {self.task_id} saved to {save_filepath}") diff --git a/examples/data/example_requests_to_parallel_process.jsonl b/examples/data/example_requests_to_parallel_process.jsonl index 43da2c5..29adf93 100644 --- a/examples/data/example_requests_to_parallel_process.jsonl +++ b/examples/data/example_requests_to_parallel_process.jsonl @@ -1,4 +1,4 @@ -{"model": "text-embedding-ada-002", "input": "0\n"} +{"model": "text-embedding-ada-002", "input": "0\n", "metadata": {"row_id": 1}} {"model": "text-embedding-ada-002", "input": "1\n"} {"model": "text-embedding-ada-002", "input": "2\n"} {"model": "text-embedding-ada-002", "input": "3\n"}