From ebee05609bf3685ad170a590f95e4f4f6679acbf Mon Sep 17 00:00:00 2001 From: Danielwoodh Date: Sun, 11 Aug 2024 13:52:19 +0100 Subject: [PATCH] edit: reformatted files --- simplifine_alpha/annotation.py | 638 ++++---- simplifine_alpha/inference_tools.py | 128 +- simplifine_alpha/logger.py | 182 ++- simplifine_alpha/train_engine.py | 1788 +++++++++++++---------- simplifine_alpha/train_engine_client.py | 129 +- 5 files changed, 1587 insertions(+), 1278 deletions(-) diff --git a/simplifine_alpha/annotation.py b/simplifine_alpha/annotation.py index ea4826c..049011a 100644 --- a/simplifine_alpha/annotation.py +++ b/simplifine_alpha/annotation.py @@ -1,24 +1,25 @@ -''' - Simplfine is an easy-to-use, open-source library for fine-tuning LLMs models quickly on your own hardware or cloud. - Copyright (C) 2024 Simplifine Corp. +""" +Simplfine is an easy-to-use, open-source library for fine-tuning LLMs models quickly on your own hardware or cloud. +Copyright (C) 2024 Simplifine Corp. - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. - You should have received a copy of the GNU General Public License - along with this program. If not, see . -''' +You should have received a copy of the GNU General Public License +along with this program. If not, see . +""" from openai import OpenAI import os import json + # from text_chunker import TextChunker from tqdm import tqdm from utils import chunk_text_by_words @@ -29,125 +30,143 @@ from openai import AsyncOpenAI - - # TODO: add a function to check the status of a batch request class synthetic: def __init__(self): self.client = OpenAI( - api_key='', + api_key="", ) self.async_client = AsyncOpenAI( - api_key='', + api_key="", ) - self.model_name = 'gpt-4o' + self.model_name = "gpt-4o" self.script_path = os.path.dirname(os.path.abspath(__file__)) # check to see if file to store the batch requests exists - if 'batch requests' not in os.listdir(self.script_path): - print('Creating a directory to store batch requests') - os.mkdir(os.path.join(self.script_path, 'batch requests')) - + if "batch requests" not in os.listdir(self.script_path): + print("Creating a directory to store batch requests") + os.mkdir(os.path.join(self.script_path, "batch requests")) + # variable to store the current batch id self.current_batch = None - - def create_prompt_qa_gen(self, chunk:str): - system_prompt = f""" + def create_prompt_qa_gen(self, chunk: str): + system_prompt = """ You are a helpful assistant creating a question about the text. You are given a text and you need to create a question about it. Just respond with the qeusiton you would ask about the text. Say nothing else. """ user_prompt = f"""text: {chunk}""" - messages=[ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt}, - ] + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ] return messages - - async def create_qa_pair_single_async(self, - text:str, - chunk_size:int=40, - overlap:int=10, - shrink:bool=True, - ratio_of_shrink:float=0.5): + + async def create_qa_pair_single_async( + self, + text: str, + chunk_size: int = 40, + overlap: int = 10, + shrink: bool = True, + ratio_of_shrink: float = 0.5, + ): + """ + Create a synch call for QA generation. + **NOTE: this is a synchronous call, and it is not recommended to use it for large texts. """ - Create a synch call for QA generation. - **NOTE: this is a synchronous call, and it is not recommended to use it for large texts.""" chunks = chunk_text_by_words(text, chunk_size, overlap) if shrink: # creating random indices to shrink the text - shrink_indices = random.sample(range(len(chunks)), int(ratio_of_shrink*len(chunks))) - shrunk_chunks = [chunks[i] for i in range(len(chunks)) if i in shrink_indices] + shrink_indices = random.sample( + range(len(chunks)), int(ratio_of_shrink * len(chunks)) + ) + shrunk_chunks = [ + chunks[i] for i in range(len(chunks)) if i in shrink_indices + ] questions, answers = [], [] for chunk in tqdm(shrunk_chunks): answers.append(chunk) messages = self.create_prompt_qa_gen(chunk) completion = await self.async_client.chat.completions.create( - model=self.model_name, - messages=messages + model=self.model_name, messages=messages + ) + print( + "\n----------------\n", + completion.choices[0].message.content, + "\n----------------\n", ) - print('\n----------------\n',completion.choices[0].message.content,'\n----------------\n') questions.append(completion.choices[0].message.content) return questions, answers, chunks, shrink_indices - - - - def create_qa_pair_single(self, - text:str, - chunk_size:int=40, - overlap:int=10, - shrink:bool=True, - ratio_of_shrink:float=0.5): + + def create_qa_pair_single( + self, + text: str, + chunk_size: int = 40, + overlap: int = 10, + shrink: bool = True, + ratio_of_shrink: float = 0.5, + ): + """ + Create a synch call for QA generation. + **NOTE: this is a synchronous call, and it is not recommended to use it for large texts. """ - Create a synch call for QA generation. - **NOTE: this is a synchronous call, and it is not recommended to use it for large texts.""" chunks = chunk_text_by_words(text, chunk_size, overlap) if shrink: # creating random indices to shrink the text - shrink_indices = random.sample(range(len(chunks)), int(ratio_of_shrink*len(chunks))) - shrunk_chunks = [chunks[i] for i in range(len(chunks)) if i in shrink_indices] + shrink_indices = random.sample( + range(len(chunks)), int(ratio_of_shrink * len(chunks)) + ) + shrunk_chunks = [ + chunks[i] for i in range(len(chunks)) if i in shrink_indices + ] questions, answers = [], [] for chunk in tqdm(shrunk_chunks): answers.append(chunk) messages = self.create_prompt_qa_gen(chunk) completion = self.client.chat.completions.create( - model=self.model_name, - messages=messages + model=self.model_name, messages=messages ) questions.append(completion.choices[0].message.content) return questions, answers, chunks, shrink_indices - - def create_qa_pair_batch_file(self, text:str, file_name:str, chunk_size:int=200, overlap:int=10, shrink:bool=True, ratio_of_shrink:float=0.5): + + def create_qa_pair_batch_file( + self, + text: str, + file_name: str, + chunk_size: int = 200, + overlap: int = 10, + shrink: bool = True, + ratio_of_shrink: float = 0.5, + ): + """ + Create a batch file for QA generation. + **NOTE: this is an asynchronous call, and it is recommended to use it for large texts. """ - Create a batch file for QA generation. - **NOTE: this is an asynchronous call, and it is recommended to use it for large texts.""" chunks = chunk_text_by_words(text, chunk_size, overlap=overlap) shrink_indices = list(range(len(chunks))) if shrink: # creating random indices to shrink the text - shrink_indices = random.sample(range(len(chunks)), int(ratio_of_shrink*len(chunks))) - shrunk_chunks = [chunks[i] for i in range(len(chunks)) if i in shrink_indices] - + shrink_indices = random.sample( + range(len(chunks)), int(ratio_of_shrink * len(chunks)) + ) + shrunk_chunks = [ + chunks[i] for i in range(len(chunks)) if i in shrink_indices + ] - save_path = os.path.join(self.script_path, 'batch requests', file_name) - batch_request_path = os.path.join(self.script_path, 'batch requests') + save_path = os.path.join(self.script_path, "batch requests", file_name) + batch_request_path = os.path.join(self.script_path, "batch requests") if file_name in os.listdir(batch_request_path): - print('File already exists') - raise FileExistsError(f'File with name *{file_name}* already exists') + print("File already exists") + raise FileExistsError(f"File with name *{file_name}* already exists") count = 0 for chunk in shrunk_chunks: reqID = f"request-{count}" count += 1 - batch_format = { - "custom_id":"", - "method":"", - "url":"", - "body":{} - } + batch_format = {"custom_id": "", "method": "", "url": "", "body": {}} messages = self.create_prompt_qa_gen(chunk) batch_format["custom_id"] = reqID batch_format["method"] = "POST" @@ -156,28 +175,34 @@ def create_qa_pair_batch_file(self, text:str, file_name:str, chunk_size:int=200, batch_format["body"]["messages"] = messages batch_format["body"]["max_tokens"] = 2000 - if os.path.exists(save_path): - print('File exists') - with open(save_path, 'a') as file: + print("File exists") + with open(save_path, "a") as file: for entry in [batch_format]: - file.write(json.dumps(entry) + '\n') + file.write(json.dumps(entry) + "\n") else: - print('File does not exists') + print("File does not exists") existing_data = {} - for col in ['custom_id', 'method', 'url', 'body']: + for col in ["custom_id", "method", "url", "body"]: existing_data[col] = batch_format[col] existing_data = [existing_data] - with open(save_path, 'w') as file: + with open(save_path, "w") as file: for entry in existing_data: - file.write(json.dumps(entry) + '\n') - - return shrink_indices, chunks - + file.write(json.dumps(entry) + "\n") + + return shrink_indices, chunks # TODO: change to accomodate text as list (multiple docs) - def QA_gen_batch(self, text:str, chunk_size:int=200, overlap:int=10, shrink:bool=True, ratio_of_shrink:float=0.5, description=""): + def QA_gen_batch( + self, + text: str, + chunk_size: int = 200, + overlap: int = 10, + shrink: bool = True, + ratio_of_shrink: float = 0.5, + description="", + ): """ evaluating a batch of questions and answers. args: @@ -185,31 +210,42 @@ def QA_gen_batch(self, text:str, chunk_size:int=200, overlap:int=10, shrink:bool returns: llm_answers: list, a list of answers from the model """ - file_name = str(uuid.uuid4()) + '.jsonl' - shrink_indices, chunks = self.create_qa_pair_batch_file(text, file_name, chunk_size, overlap, shrink, ratio_of_shrink) + file_name = str(uuid.uuid4()) + ".jsonl" + shrink_indices, chunks = self.create_qa_pair_batch_file( + text, file_name, chunk_size, overlap, shrink, ratio_of_shrink + ) - file_path = os.path.join(self.script_path, 'batch requests', file_name) + file_path = os.path.join(self.script_path, "batch requests", file_name) batch_input_file = self.client.files.create( - file=open(file_path, "rb"), - purpose="batch" + file=open(file_path, "rb"), purpose="batch" ) batch_input_file_id = batch_input_file.id returned_json = self.client.batches.create( - input_file_id=batch_input_file_id, - endpoint="/v1/chat/completions", - completion_window="24h", - metadata={ - "description":description - } + input_file_id=batch_input_file_id, + endpoint="/v1/chat/completions", + completion_window="24h", + metadata={"description": description}, ) - task_specific_paramters = {'chunk_size':chunk_size, 'shrink_indices':shrink_indices, 'chunks':chunks} + task_specific_paramters = { + "chunk_size": chunk_size, + "shrink_indices": shrink_indices, + "chunks": chunks, + } # save the metadata for the batch request - log_batch_metadata(returned_json.id, batch_input_file_id, file_name, description, task_specific_paramters=task_specific_paramters, task='ANOT-QA') + log_batch_metadata( + returned_json.id, + batch_input_file_id, + file_name, + description, + task_specific_paramters=task_specific_paramters, + task="ANOT-QA", + ) - - def create_QA_annotation_file(self, batch_id:str, file_path:str, overwrite:bool=False): + def create_QA_annotation_file( + self, batch_id: str, file_path: str, overwrite: bool = False + ): """ retireve the results of a batch request and save it as a jsonl file. **NOTE: file type accepted now is only .jsonl @@ -218,97 +254,95 @@ def create_QA_annotation_file(self, batch_id:str, file_path:str, overwrite:bool= file_path: str, the path of the file to save the results """ - file_extension = file_path.split('.')[-1] - if file_extension != 'jsonl': - raise ValueError(f'File type is {file_extension}, only jsonl is supported at the moment.') + file_extension = file_path.split(".")[-1] + if file_extension != "jsonl": + raise ValueError( + f"File type is {file_extension}, only jsonl is supported at the moment." + ) # check if ID is for batch qa annotation - path = os.path.join(script_path, 'batch requests', 'logger.jsonl') + path = os.path.join(script_path, "batch requests", "logger.jsonl") all_meta_data = read_jsonl(path) batch_not_found = True job_task_dont_match = True for i in range(len(all_meta_data)): - if all_meta_data[i]['id'] == batch_id: + if all_meta_data[i]["id"] == batch_id: batch_not_found = False - if 'ANOT-QA' in all_meta_data[i]['task']: + if "ANOT-QA" in all_meta_data[i]["task"]: job_task_dont_match = False break if batch_not_found: - raise ValueError('Batch ID not found') + raise ValueError("Batch ID not found") if job_task_dont_match: - raise ValueError(f'Batch ID {batch_id} does not match the task, task recorded as: {all_meta_data[i]["task"]}, target should be: ANOT-QA') + raise ValueError( + f'Batch ID {batch_id} does not match the task, task recorded as: {all_meta_data[i]["task"]}, target should be: ANOT-QA' + ) matched_resp = match_response_to_request(batch_id, self.client) # writing to the path if os.path.exists(file_path): - print(f'File {file_path} already exists') + print(f"File {file_path} already exists") if overwrite: existing_data = {} for col in list(matched_resp.keys()): existing_data[col] = matched_resp[col] existing_data = [existing_data] - with open(file_path, 'w+') as file: + with open(file_path, "w+") as file: for entry in existing_data: - file.write(json.dumps(entry) + '\n') + file.write(json.dumps(entry) + "\n") else: - with open(file_path, 'a') as file: + with open(file_path, "a") as file: for entry in [file_path]: - file.write(json.dumps(entry) + '\n') + file.write(json.dumps(entry) + "\n") else: existing_data = {} for col in list(matched_resp.keys()): existing_data[col] = matched_resp[col] existing_data = [existing_data] - with open(file_path, 'w+') as file: + with open(file_path, "w+") as file: for entry in existing_data: - file.write(json.dumps(entry) + '\n') + file.write(json.dumps(entry) + "\n") - - def create_prompt_summarization(self, text:str): - system_prompt = f"""You are a helpful assistant summarizing a text. You are given a text and you need to create a summary of it. + def create_prompt_summarization(self, text: str): + system_prompt = """You are a helpful assistant summarizing a text. You are given a text and you need to create a summary of it. Be concise and to the point. Include the important information. Just respond with the summary you would create for the text. Say nothing else. """ user_prompt = f"""text: {text}""" - messages=[ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt}, - ] + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ] return messages - def create_summarization_single(self, text:str): + def create_summarization_single(self, text: str): + """ + Create a synch call for summarization. + **NOTE: this is a synchronous call, and it is not recommended to use it for large texts. """ - Create a synch call for summarization. - **NOTE: this is a synchronous call, and it is not recommended to use it for large texts.""" messages = self.create_prompt_summarization(text) completion = self.client.chat.completions.create( - model=self.model_name, - messages=messages + model=self.model_name, messages=messages ) return completion.choices[0].message.content - - - def create_summarization_batch_file(self, text_list:list, file_name:str): + + def create_summarization_batch_file(self, text_list: list, file_name: str): + """ + Create a batch file for summarization. + **NOTE: this is an asynchronous call, and it is recommended to use it for large texts. """ - Create a batch file for summarization. - **NOTE: this is an asynchronous call, and it is recommended to use it for large texts.""" - save_path = os.path.join(self.script_path, 'batch requests', file_name) - batch_request_path = os.path.join(self.script_path, 'batch requests') + save_path = os.path.join(self.script_path, "batch requests", file_name) + batch_request_path = os.path.join(self.script_path, "batch requests") if file_name in os.listdir(batch_request_path): - print('File already exists') - raise FileExistsError(f'File with name *{file_name}* already exists') - + print("File already exists") + raise FileExistsError(f"File with name *{file_name}* already exists") + count = 0 for text in text_list: reqID = f"request-{count}" count += 1 - batch_format = { - "custom_id":"", - "method":"", - "url":"", - "body":{} - } + batch_format = {"custom_id": "", "method": "", "url": "", "body": {}} messages = self.create_prompt_summarization(text) batch_format["custom_id"] = reqID batch_format["method"] = "POST" @@ -317,24 +351,23 @@ def create_summarization_batch_file(self, text_list:list, file_name:str): batch_format["body"]["messages"] = messages batch_format["body"]["max_tokens"] = 2000 - if os.path.exists(save_path): - print('File exists') - with open(save_path, 'a') as file: + print("File exists") + with open(save_path, "a") as file: for entry in [batch_format]: - file.write(json.dumps(entry) + '\n') + file.write(json.dumps(entry) + "\n") else: - print('File does not exists') + print("File does not exists") existing_data = {} - for col in ['custom_id', 'method', 'url', 'body']: + for col in ["custom_id", "method", "url", "body"]: existing_data[col] = batch_format[col] existing_data = [existing_data] - with open(save_path, 'w') as file: + with open(save_path, "w") as file: for entry in existing_data: - file.write(json.dumps(entry) + '\n') - - def summarization_gen_batch(self, text:list, description=""): + file.write(json.dumps(entry) + "\n") + + def summarization_gen_batch(self, text: list, description=""): """ creating a batch of summarizations. args: @@ -342,29 +375,35 @@ def summarization_gen_batch(self, text:list, description=""): file_name: str, the name of the file containing the questions and answers description: str, the description of the batch request """ - file_name = str(uuid.uuid4()) + '.jsonl' + file_name = str(uuid.uuid4()) + ".jsonl" self.create_summarization_batch_file(text, file_name) - file_path = os.path.join(self.script_path, 'batch requests', file_name) + file_path = os.path.join(self.script_path, "batch requests", file_name) batch_input_file = self.client.files.create( - file=open(file_path, "rb"), - purpose="batch" + file=open(file_path, "rb"), purpose="batch" ) batch_input_file_id = batch_input_file.id returned_json = self.client.batches.create( - input_file_id=batch_input_file_id, - endpoint="/v1/chat/completions", - completion_window="24h", - metadata={ - "description":description - } + input_file_id=batch_input_file_id, + endpoint="/v1/chat/completions", + completion_window="24h", + metadata={"description": description}, ) # save the metadata for the batch request - log_batch_metadata(returned_json.id, batch_input_file_id, file_name, description,task='ANOT-SUMMARIZATION',task_specific_paramters={}) + log_batch_metadata( + returned_json.id, + batch_input_file_id, + file_name, + description, + task="ANOT-SUMMARIZATION", + task_specific_paramters={}, + ) - def create_summarization_annotation_file(self, batch_id:str, file_path:str, overwrite:bool=False): + def create_summarization_annotation_file( + self, batch_id: str, file_path: str, overwrite: bool = False + ): """ retireve the results of a batch request and save it as a jsonl file. **NOTE: file type accepted now is only .jsonl @@ -372,93 +411,105 @@ def create_summarization_annotation_file(self, batch_id:str, file_path:str, over batch_id: str, the id of the batch request file_path: str, the path of the file to save the results """ - file_extension = file_path.split('.')[-1] - if file_extension != 'jsonl': - raise ValueError(f'File type is {file_extension}, only jsonl is supported at the moment.') + file_extension = file_path.split(".")[-1] + if file_extension != "jsonl": + raise ValueError( + f"File type is {file_extension}, only jsonl is supported at the moment." + ) # check if ID is for batch qa annotation - path = os.path.join(script_path, 'batch requests', 'logger.jsonl') + path = os.path.join(script_path, "batch requests", "logger.jsonl") all_meta_data = read_jsonl(path) batch_not_found = True job_task_dont_match = True for i in range(len(all_meta_data)): - if all_meta_data[i]['id'] == batch_id: + if all_meta_data[i]["id"] == batch_id: batch_not_found = False - if 'ANOT-SUMMARIZATION' in all_meta_data[i]['task']: + if "ANOT-SUMMARIZATION" in all_meta_data[i]["task"]: job_task_dont_match = False break if batch_not_found: - raise ValueError('Batch ID not found') + raise ValueError("Batch ID not found") if job_task_dont_match: - raise ValueError(f'Batch ID {batch_id} does not match the task, task recorded as: {all_meta_data[i]["task"]}, target should be: ANOT-SUMMARIZATION') + raise ValueError( + f'Batch ID {batch_id} does not match the task, task recorded as: {all_meta_data[i]["task"]}, target should be: ANOT-SUMMARIZATION' + ) matched_resp = match_response_to_request(batch_id, self.client) # writing to the path if os.path.exists(file_path): - print(f'File {file_path} already exists') + print(f"File {file_path} already exists") if overwrite: existing_data = {} for col in list(matched_resp.keys()): existing_data[col] = matched_resp[col] existing_data = [existing_data] - with open(file_path, 'w+') as file: + with open(file_path, "w+") as file: for entry in existing_data: - file.write(json.dumps(entry) + '\n') + file.write(json.dumps(entry) + "\n") else: - with open(file_path, 'a') as file: + with open(file_path, "a") as file: for entry in [file_path]: - file.write(json.dumps(entry) + '\n') + file.write(json.dumps(entry) + "\n") else: existing_data = {} for col in list(matched_resp.keys()): existing_data[col] = matched_resp[col] existing_data = [existing_data] - with open(file_path, 'w+') as file: + with open(file_path, "w+") as file: for entry in existing_data: - file.write(json.dumps(entry) + '\n') - - - def clf_prompt(self, text:str, classes:list=[], give_explanation:bool=False, example:str=""): + file.write(json.dumps(entry) + "\n") + + def clf_prompt( + self, + text: str, + classes: list = [], + give_explanation: bool = False, + example: str = "", + ): if len(classes) == 0: - raise ValueError('No classes provided') + raise ValueError("No classes provided") if give_explanation: - exp_str = '''Provide a ratuionale for your classification. Your response should be formatted as: + exp_str = """Provide a ratuionale for your classification. Your response should be formatted as: #class: the assigned class, #rationale: the rational you used to get this label. - For example: User input: text: I love this movie, classes: Positive, Negative. your response: #class: Positive, #rationale: Positive because the user said they love the movie.''' + For example: User input: text: I love this movie, classes: Positive, Negative. your response: #class: Positive, #rationale: Positive because the user said they love the movie.""" else: - exp_str = '' + exp_str = "" system_prompt = f"""You are a helpful assistant classifying a text. You are given a text and you need to classify it. The user will provide you with the classes to choose from. Just respond with the class you would assign to the text. Say nothing else. {exp_str}. \n An example is written below. """ user_prompt = f"""text: {text}, classes: {classes}""" - messages=[ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt}, - ] + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ] return messages - - def clf_batch_file(self, file_name:str, text_list:list, classes:list, give_explanation:bool=False, example:str=""): + + def clf_batch_file( + self, + file_name: str, + text_list: list, + classes: list, + give_explanation: bool = False, + example: str = "", + ): + """ + Create a batch file for classification. + **NOTE: this is an asynchronous call, and it is recommended to use it for large requests. """ - Create a batch file for classification. - **NOTE: this is an asynchronous call, and it is recommended to use it for large requests.""" - save_path = os.path.join(self.script_path, 'batch requests', file_name) - batch_request_path = os.path.join(self.script_path, 'batch requests') + save_path = os.path.join(self.script_path, "batch requests", file_name) + batch_request_path = os.path.join(self.script_path, "batch requests") if file_name in os.listdir(batch_request_path): - print('File already exists') - raise FileExistsError(f'File with name *{file_name}* already exists') - + print("File already exists") + raise FileExistsError(f"File with name *{file_name}* already exists") + count = 0 for text, cl in zip(text_list, classes): reqID = f"request-{count}" count += 1 - batch_format = { - "custom_id":"", - "method":"", - "url":"", - "body":{} - } + batch_format = {"custom_id": "", "method": "", "url": "", "body": {}} messages = self.clf_prompt(text, classes, give_explanation, example) batch_format["custom_id"] = reqID batch_format["method"] = "POST" @@ -467,25 +518,30 @@ def clf_batch_file(self, file_name:str, text_list:list, classes:list, give_expla batch_format["body"]["messages"] = messages batch_format["body"]["max_tokens"] = 2000 - if os.path.exists(save_path): - print('File exists') - with open(save_path, 'a') as file: + print("File exists") + with open(save_path, "a") as file: for entry in [batch_format]: - file.write(json.dumps(entry) + '\n') + file.write(json.dumps(entry) + "\n") else: - print('File does not exists') + print("File does not exists") existing_data = {} - for col in ['custom_id', 'method', 'url', 'body']: + for col in ["custom_id", "method", "url", "body"]: existing_data[col] = batch_format[col] existing_data = [existing_data] - with open(save_path, 'w') as file: + with open(save_path, "w") as file: for entry in existing_data: - file.write(json.dumps(entry) + '\n') - - - def clf_gen_batch(self, text_list:list, classes:list, give_explanation:bool=False, example:str="", description=""): + file.write(json.dumps(entry) + "\n") + + def clf_gen_batch( + self, + text_list: list, + classes: list, + give_explanation: bool = False, + example: str = "", + description="", + ): """ creating a batch of classifications. args: @@ -494,50 +550,58 @@ def clf_gen_batch(self, text_list:list, classes:list, give_explanation:bool=Fals file_name: str, the name of the file containing the questions and answers description: str, the description of the batch request """ - file_name = str(uuid.uuid4()) + '.jsonl' + file_name = str(uuid.uuid4()) + ".jsonl" self.clf_batch_file(file_name, text_list, classes, give_explanation, example) - file_path = os.path.join(self.script_path, 'batch requests', file_name) + file_path = os.path.join(self.script_path, "batch requests", file_name) batch_input_file = self.client.files.create( - file=open(file_path, "rb"), - purpose="batch" + file=open(file_path, "rb"), purpose="batch" ) batch_input_file_id = batch_input_file.id returned_json = self.client.batches.create( - input_file_id=batch_input_file_id, - endpoint="/v1/chat/completions", - completion_window="24h", - metadata={ - "description":description - } + input_file_id=batch_input_file_id, + endpoint="/v1/chat/completions", + completion_window="24h", + metadata={"description": description}, ) - task_specific_paramters = {'give_explanation':give_explanation, 'classes':classes} + task_specific_paramters = { + "give_explanation": give_explanation, + "classes": classes, + } # save the metadata for the batch request - log_batch_metadata(returned_json.id, batch_input_file_id, file_name, description,task='ANOT-CLASSIFICATION',task_specific_paramters=task_specific_paramters) - + log_batch_metadata( + returned_json.id, + batch_input_file_id, + file_name, + description, + task="ANOT-CLASSIFICATION", + task_specific_paramters=task_specific_paramters, + ) # function to check if a batch is complete and matches the task to the response - def check_batch_matches_task(self, batch_id:str, task:str): - path = os.path.join(script_path, 'batch requests', 'logger.jsonl') + def check_batch_matches_task(self, batch_id: str, task: str): + path = os.path.join(script_path, "batch requests", "logger.jsonl") all_meta_data = read_jsonl(path) batch_not_found = True job_task_dont_match = True for i in range(len(all_meta_data)): - if all_meta_data[i]['id'] == batch_id: + if all_meta_data[i]["id"] == batch_id: batch_not_found = False - if task in all_meta_data[i]['task']: + if task in all_meta_data[i]["task"]: job_task_dont_match = False break if batch_not_found: - raise ValueError('Batch ID not found') + raise ValueError("Batch ID not found") if job_task_dont_match: - raise ValueError(f'Batch ID {batch_id} does not match the task, task recorded as: {all_meta_data[i]["task"]}, target should be: ANOT-CLASSIFICATION') - - + raise ValueError( + f'Batch ID {batch_id} does not match the task, task recorded as: {all_meta_data[i]["task"]}, target should be: ANOT-CLASSIFICATION' + ) - def create_clf_annotation_file(self, batch_id:str, file_path:str, overwrite:bool=False): + def create_clf_annotation_file( + self, batch_id: str, file_path: str, overwrite: bool = False + ): """ retireve the results of a batch request and save it as a jsonl file. **NOTE: file type accepted now is only .jsonl @@ -549,53 +613,56 @@ def create_clf_annotation_file(self, batch_id:str, file_path:str, overwrite:bool # status = get_batch_status(batch_id, self.client) # if status != 'completed': # raise ValueError(f'Batch {batch_id} is not completed yet, current status is {status}') - file_extension = file_path.split('.')[-1] - if file_extension != 'jsonl': - raise ValueError(f'File type is {file_extension}, only jsonl is supported at the moment.') + file_extension = file_path.split(".")[-1] + if file_extension != "jsonl": + raise ValueError( + f"File type is {file_extension}, only jsonl is supported at the moment." + ) # check if ID is for batch qa annotation - path = os.path.join(script_path, 'batch requests', 'logger.jsonl') + path = os.path.join(script_path, "batch requests", "logger.jsonl") all_meta_data = read_jsonl(path) batch_not_found = True job_task_dont_match = True for i in range(len(all_meta_data)): - if all_meta_data[i]['id'] == batch_id: + if all_meta_data[i]["id"] == batch_id: batch_not_found = False - if 'ANOT-CLASSIFICATION' in all_meta_data[i]['task']: + if "ANOT-CLASSIFICATION" in all_meta_data[i]["task"]: job_task_dont_match = False break if batch_not_found: - raise ValueError('Batch ID not found') + raise ValueError("Batch ID not found") if job_task_dont_match: - raise ValueError(f'Batch ID {batch_id} does not match the task, task recorded as: {all_meta_data[i]["task"]}, target should be: ANOT-CLASSIFICATION') + raise ValueError( + f'Batch ID {batch_id} does not match the task, task recorded as: {all_meta_data[i]["task"]}, target should be: ANOT-CLASSIFICATION' + ) matched_resp = match_response_to_request(batch_id, self.client) # writing to the path if os.path.exists(file_path): - print(f'File {file_path} already exists') + print(f"File {file_path} already exists") if overwrite: existing_data = {} for col in list(matched_resp.keys()): existing_data[col] = matched_resp[col] existing_data = [existing_data] - with open(file_path, 'w+') as file: + with open(file_path, "w+") as file: for entry in existing_data: - file.write(json.dumps(entry) + '\n') + file.write(json.dumps(entry) + "\n") else: - with open(file_path, 'a') as file: + with open(file_path, "a") as file: for entry in [file_path]: - file.write(json.dumps(entry) + '\n') + file.write(json.dumps(entry) + "\n") else: existing_data = {} for col in list(matched_resp.keys()): existing_data[col] = matched_resp[col] existing_data = [existing_data] - with open(file_path, 'w+') as file: + with open(file_path, "w+") as file: for entry in existing_data: - file.write(json.dumps(entry) + '\n') - + file.write(json.dumps(entry) + "\n") - def parse_response_clf(self, batch_id:str=''): + def parse_response_clf(self, batch_id: str = ""): """ Parse the response of a classification batch request. args: @@ -603,48 +670,49 @@ def parse_response_clf(self, batch_id:str=''): returns: parsed_response: dict, a dictionary containing the parsed response """ - self.check_batch_matches_task(batch_id, 'ANOT-CLASSIFICATION') + self.check_batch_matches_task(batch_id, "ANOT-CLASSIFICATION") matched_response = match_response_to_request(batch_id, self.client) - task_specific_paramters = matched_response['task_specific_paramters'] - give_explanation = task_specific_paramters['give_explanation'] - classes = task_specific_paramters['classes'] + task_specific_paramters = matched_response["task_specific_paramters"] + give_explanation = task_specific_paramters["give_explanation"] + classes = task_specific_paramters["classes"] # parsing the response with re class_pattern = r"#class:\s*([^,]+)" rational_pattern = r"#rationale:\s*(.+)" - - parsed_response = {'input':[], 'class':[], 'rationale':[]} - for i in range(len(matched_response['request'])): - input_text = matched_response['request'][i] - output = matched_response['response'][i] + parsed_response = {"input": [], "class": [], "rationale": []} + for i in range(len(matched_response["request"])): + input_text = matched_response["request"][i] + output = matched_response["response"][i] # Find matches if not give_explanation: class_match = re.search(class_pattern, output) - rational_match = 'N/A' - rational_value = 'N/A' + rational_match = "N/A" + rational_value = "N/A" else: class_match = re.search(class_pattern, output) rational_match = re.search(rational_pattern, output) class_value = class_match.group(1).strip() rational_value = rational_match.group(1).strip() - + # check if matches are found if class_value in classes: - parsed_response['input'].append(input_text) - parsed_response['class'].append(class_value) - parsed_response['rationale'].append(rational_value) + parsed_response["input"].append(input_text) + parsed_response["class"].append(class_value) + parsed_response["rationale"].append(rational_value) else: - print(f'Class {class_value} not found in classes provided') - + print(f"Class {class_value} not found in classes provided") return parsed_response - # TODO: complete the function - def function_calling_prompt(self, text:str, functions:dict={'function':[], 'parameters':[], 'description':[]}): - system_prompt = f"""You are a helpful assistant calling a function. You are given a text and you need to call the appropriate function. The user will provide you with the functions to choose from. + def function_calling_prompt( + self, + text: str, + functions: dict = {"function": [], "parameters": [], "description": []}, + ): + system_prompt = """You are a helpful assistant calling a function. You are given a text and you need to call the appropriate function. The user will provide you with the functions to choose from. Just respond with the function you would call to the text. Say nothing else. An example is written below. User: get the weather in london for the next week. Function 1: get_weather, @@ -658,12 +726,14 @@ def function_calling_prompt(self, text:str, functions:dict={'function':[], 'para """ user_prompt = f"""text: {text}""" - messages=[ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt}, - ] + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ] return messages # TODO: complete the function - def function_calling_single(self, text:str, functions:dict={'function':[], 'description':[]}): + def function_calling_single( + self, text: str, functions: dict = {"function": [], "description": []} + ): pass diff --git a/simplifine_alpha/inference_tools.py b/simplifine_alpha/inference_tools.py index bd4b758..84f873b 100644 --- a/simplifine_alpha/inference_tools.py +++ b/simplifine_alpha/inference_tools.py @@ -1,60 +1,58 @@ -''' - Simplfine is an easy-to-use, open-source library for fine-tuning LLMs models quickly on your own hardware or cloud. - Copyright (C) 2024 Simplifine Corp. +""" +Simplfine is an easy-to-use, open-source library for fine-tuning LLMs models quickly on your own hardware or cloud. +Copyright (C) 2024 Simplifine Corp. - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. - You should have received a copy of the GNU General Public License - along with this program. If not, see . -''' +You should have received a copy of the GNU General Public License +along with this program. If not, see . +""" -from transformers import AutoTokenizer, AutoModelForCausalLM, TrainingArguments, Trainer, DataCollatorForLanguageModeling +from transformers import AutoTokenizer, AutoModelForCausalLM from dataclasses import dataclass @dataclass class GenerationConfig: - train_type:str - max_length:int - num_return_sequences:int - do_sample:bool - top_k:int - top_p:float - temperature:float - prompt_template:str - response_template:str - keys:list + train_type: str + max_length: int + num_return_sequences: int + do_sample: bool + top_k: int + top_p: float + temperature: float + prompt_template: str + response_template: str + keys: list -def parse_sft_prompt(generate_config:GenerationConfig, data:dict, tokenizer:AutoTokenizer): +def parse_sft_prompt( + generate_config: GenerationConfig, data: dict, tokenizer: AutoTokenizer +): _is_chat = False if tokenizer.chat_template: # dummy messages to extract chat tokens messages = [ - { - "role": "system", - "content": "You are a helpful assistant." - }, - { - "role": "user", - "content": "Who won the world series in 2020?" - }, + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "Who won the world series in 2020?"}, { "role": "assistant", - "content": "The Los Angeles Dodgers won the World Series in 2020." - } - ] + "content": "The Los Angeles Dodgers won the World Series in 2020.", + }, + ] text = tokenizer.apply_chat_template(messages, tokenize=False) - chat_temp = text.split(messages[1]['content'])[-1].split(messages[-1]['content'])[0] - chat_response_temp = chat_temp.replace(tokenizer.eos_token,'') + chat_temp = text.split(messages[1]["content"])[-1].split( + messages[-1]["content"] + )[0] + chat_response_temp = chat_temp.replace(tokenizer.eos_token, "") _is_chat = True else: _is_chat = False @@ -62,36 +60,56 @@ def parse_sft_prompt(generate_config:GenerationConfig, data:dict, tokenizer:Auto expected_responses = [] for i in range(len(data[generate_config.keys[0]])): formatted_text = generate_config.prompt_template.format( - **{key: data[key][i] for key in generate_config.keys} - ) + **{key: data[key][i] for key in generate_config.keys} + ) if _is_chat: - user_text, assistant_text = formatted_text.split(generate_config.response_template) + user_text, assistant_text = formatted_text.split( + generate_config.response_template + ) assistant_text = generate_config.response_template + assistant_text messages = [ - {"role": "user", "content": user_text}, - {"role": "assistant", "content": assistant_text}, - ] - chat_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=False) - chat_prompt_pre_gen, chat_prompt_post_gen = chat_prompt.split(chat_response_temp) + {"role": "user", "content": user_text}, + {"role": "assistant", "content": assistant_text}, + ] + chat_prompt = tokenizer.apply_chat_template( + messages, tokenize=False, add_generation_prompt=False + ) + chat_prompt_pre_gen, chat_prompt_post_gen = chat_prompt.split( + chat_response_temp + ) formatted_prompts.append(chat_prompt_pre_gen) expected_responses.append(chat_prompt_post_gen) else: - formatted_text_pre_gen, formatted_text_post_gen = formatted_text.split(generate_config.response_template) + formatted_text_pre_gen, formatted_text_post_gen = formatted_text.split( + generate_config.response_template + ) formatted_prompts.append(formatted_text_pre_gen) expected_responses.append(formatted_text_post_gen) return formatted_prompts, expected_responses - -def generate_from_pretrained(model:AutoModelForCausalLM, tokenizer:AutoTokenizer, generate_confg:GenerationConfig, - data:dict={}): +def generate_from_pretrained( + model: AutoModelForCausalLM, + tokenizer: AutoTokenizer, + generate_confg: GenerationConfig, + data: dict = {}, +): generated_text = [] - if generate_confg.train_type == 'sft': - formatted_prompt, expected_outputs = parse_sft_prompt(generate_confg, data, tokenizer) + if generate_confg.train_type == "sft": + formatted_prompt, expected_outputs = parse_sft_prompt( + generate_confg, data, tokenizer + ) for prompt, expected_output in zip(formatted_prompt, expected_outputs): inputs = tokenizer(prompt, return_tensors="pt") - outputs = model.generate(**inputs, max_length=generate_confg.max_length, num_return_sequences=generate_confg.num_return_sequences, - do_sample=generate_confg.do_sample, top_k=generate_confg.top_k, top_p=generate_confg.top_p, temperature=generate_confg.temperature) + outputs = model.generate( + **inputs, + max_length=generate_confg.max_length, + num_return_sequences=generate_confg.num_return_sequences, + do_sample=generate_confg.do_sample, + top_k=generate_confg.top_k, + top_p=generate_confg.top_p, + temperature=generate_confg.temperature, + ) text = tokenizer.batch_decode(outputs, skip_special_tokens=True) generated_text.append(text) return generated_text diff --git a/simplifine_alpha/logger.py b/simplifine_alpha/logger.py index 361bc86..a6cf306 100644 --- a/simplifine_alpha/logger.py +++ b/simplifine_alpha/logger.py @@ -1,20 +1,21 @@ -''' - Simplfine is an easy-to-use, open-source library for fine-tuning LLMs models quickly on your own hardware or cloud. - Copyright (C) 2024 Simplifine Corp. - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see . -''' +""" +Simplfine is an easy-to-use, open-source library for fine-tuning LLMs models quickly on your own hardware or cloud. +Copyright (C) 2024 Simplifine Corp. + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +""" + import json import os from openai import OpenAI @@ -24,7 +25,7 @@ script_path = os.path.dirname(os.path.abspath(__file__)) -def get_task_from_batch_id(batch_id:str): +def get_task_from_batch_id(batch_id: str): """ getting the task from the batch id. args: @@ -32,11 +33,11 @@ def get_task_from_batch_id(batch_id:str): returns: task: str, the task of the batch request """ - filename = os.path.join(script_path, 'batch requests', "logger.jsonl") + filename = os.path.join(script_path, "batch requests", "logger.jsonl") all_meta = read_jsonl(filename) for i in range(len(all_meta)): - if all_meta[i]['id'] == batch_id: - task = all_meta[i]['task'] + if all_meta[i]["id"] == batch_id: + task = all_meta[i]["task"] return task raise Exception("The batch request id is not valid or the file is not found.") @@ -44,27 +45,34 @@ def get_task_from_batch_id(batch_id:str): def write_jsonl(file_path, data, append=True): """ Writes a list of JSON serializable objects to a JSONL file. - + :param file_path: The path to the JSONL file. :param data: The list of JSON serializable objects. :param append: Whether to append to the file or overwrite it. """ if os.path.exists(file_path): - with open(file_path, 'a') as file: + with open(file_path, "a") as file: for entry in [file_path]: - file.write(json.dumps(entry) + '\n') + file.write(json.dumps(entry) + "\n") else: existing_data = {} for col in list(data.keys()): existing_data[col] = data[col] existing_data = [existing_data] - with open(file_path, 'w+') as file: + with open(file_path, "w+") as file: for entry in existing_data: - file.write(json.dumps(entry) + '\n') + file.write(json.dumps(entry) + "\n") - -def log_batch_metadata(batch_id, input_file_id, input_file_name, description, task, task_specific_paramters=[]): + +def log_batch_metadata( + batch_id, + input_file_id, + input_file_name, + description, + task, + task_specific_paramters=[], +): """ logging the metadata for a batch request. args: @@ -74,32 +82,30 @@ def log_batch_metadata(batch_id, input_file_id, input_file_name, description, ta description: str, the description of the batch request """ metadata = { - "id": batch_id, - "input_file_id": input_file_id, - "input_file_name": input_file_name, - "description": description, - "status": "NA", - 'task': task, - 'task_specific_paramters': task_specific_paramters, + "id": batch_id, + "input_file_id": input_file_id, + "input_file_name": input_file_name, + "description": description, + "status": "NA", + "task": task, + "task_specific_paramters": task_specific_paramters, } - filename = os.path.join(script_path, 'batch requests', "logger.jsonl") + filename = os.path.join(script_path, "batch requests", "logger.jsonl") if os.path.exists(filename): - with open(filename, 'a') as file: + with open(filename, "a") as file: for entry in [metadata]: - file.write(json.dumps(entry) + '\n') + file.write(json.dumps(entry) + "\n") else: existing_data = {} for col in metadata.keys(): existing_data[col] = metadata[col] existing_data = [existing_data] - with open(filename, 'w+') as file: + with open(filename, "w+") as file: for entry in existing_data: - file.write(json.dumps(entry) + '\n') - - + file.write(json.dumps(entry) + "\n") def get_batch_status(client, batch_id): @@ -113,6 +119,7 @@ def get_batch_status(client, batch_id): status = client.batches.retrieve(batch_id).status return status + def get_batch_meta(client, batch_id): """ getting the status of a batch request. @@ -124,6 +131,7 @@ def get_batch_meta(client, batch_id): batch_meta_data = client.batches.retrieve(batch_id) return batch_meta_data + def retireve_batch_results(client, batch_id): """ retrieving the results of a batch request. @@ -134,13 +142,15 @@ def retireve_batch_results(client, batch_id): """ status_meta_data = get_batch_meta(client, batch_id) if status_meta_data.status == "completed": - content = client.files.content(status_meta_data.output_file_id).content.decode('utf-8') + content = client.files.content(status_meta_data.output_file_id).content.decode( + "utf-8" + ) return content else: raise Exception("The batch request is not completed yet.") - -def retireve_batch_input(batch_id:str, return_task_parameters:bool=True): + +def retireve_batch_input(batch_id: str, return_task_parameters: bool = True): """ retrieving the input of a batch request. args: @@ -148,22 +158,22 @@ def retireve_batch_input(batch_id:str, return_task_parameters:bool=True): returns: results: list, a list of results from the batch request """ - filename = os.path.join(script_path, 'batch requests', "logger.jsonl") + filename = os.path.join(script_path, "batch requests", "logger.jsonl") all_meta = read_jsonl(filename) input_file_name = None for i in range(len(all_meta)): - if all_meta[i]['id'] == batch_id: - input_file_name = all_meta[i]['input_file_name'] - task_params = all_meta[i]['task_specific_paramters'] + if all_meta[i]["id"] == batch_id: + input_file_name = all_meta[i]["input_file_name"] + task_params = all_meta[i]["task_specific_paramters"] if input_file_name is None: raise Exception("The batch request id is not valid or the file is not found.") - input_file_path = os.path.join(script_path, 'batch requests', input_file_name) + input_file_path = os.path.join(script_path, "batch requests", input_file_name) meta_data = read_jsonl(input_file_path) if return_task_parameters: return meta_data, task_params else: return meta_data - + def update_batch_status_signle(client, batch_id): """ @@ -172,17 +182,15 @@ def update_batch_status_signle(client, batch_id): client: OpenAI, the OpenAI client batch_id: str, the id of the batch request """ - path = os.path.join(script_path, 'batch requests', 'logger.jsonl') + path = os.path.join(script_path, "batch requests", "logger.jsonl") all_meta_data = read_jsonl(path) for i in range(len(all_meta_data)): - if all_meta_data[i]['id'] == batch_id: - all_meta_data[i]['status'] = get_batch_status(client, batch_id) - - with open(path, 'w') as file: - for entry in all_meta_data: - file.write(json.dumps(entry)) + '\n' - + if all_meta_data[i]["id"] == batch_id: + all_meta_data[i]["status"] = get_batch_status(client, batch_id) + with open(path, "w") as file: + for entry in all_meta_data: + file.write(json.dumps(entry)) + "\n" def update_batch_status(client): @@ -191,15 +199,23 @@ def update_batch_status(client): args: client: OpenAI, the OpenAI client """ - path = os.path.join(script_path, 'batch requests', 'logger.jsonl') + path = os.path.join(script_path, "batch requests", "logger.jsonl") all_meta_data = read_jsonl(path) for i in range(len(all_meta_data)): - if all_meta_data[i]['status'] != 'completed' or all_meta_data[i]['status'] != 'failed' or all_meta_data[i]['status'] != 'canceled' or all_meta_data[i]['status'] != 'expired': - all_meta_data[i]['status'] = get_batch_status(client, all_meta_data[i]['id']) - - with open(path, 'w') as file: + if ( + all_meta_data[i]["status"] != "completed" + or all_meta_data[i]["status"] != "failed" + or all_meta_data[i]["status"] != "canceled" + or all_meta_data[i]["status"] != "expired" + ): + all_meta_data[i]["status"] = get_batch_status( + client, all_meta_data[i]["id"] + ) + + with open(path, "w") as file: for entry in all_meta_data: - file.write(json.dumps(entry) + '\n') + file.write(json.dumps(entry) + "\n") + def match_response_to_request(batch_id, client): """ @@ -213,32 +229,36 @@ def match_response_to_request(batch_id, client): # converting the output from openAI to json/list cont = retireve_batch_results(client, batch_id) resps = [] - for i in cont.split('\n'): - if len(i)>5: + for i in cont.split("\n"): + if len(i) > 5: resps.append(json.loads(i)) - + # getting the input data reqs, task_specific_paramters = retireve_batch_input(batch_id) - matched_response = {'custom_id':[], 'request':[], 'response':[], 'task_specific_paramters':task_specific_paramters} + matched_response = { + "custom_id": [], + "request": [], + "response": [], + "task_specific_paramters": task_specific_paramters, + } # TODO: optimize this for resp in resps: - cur_id = resp['custom_id'] - matched_response['custom_id'].append(cur_id) + cur_id = resp["custom_id"] + matched_response["custom_id"].append(cur_id) for req in reqs: - if req['custom_id'] == cur_id: - cur_resp = resp['response']['body']['choices'][0]['message']['content'] - cur_req = req['body']['messages'][-1]['content'] - matched_response['request'].append(cur_req) - matched_response['response'].append(cur_resp) + if req["custom_id"] == cur_id: + cur_resp = resp["response"]["body"]["choices"][0]["message"]["content"] + cur_req = req["body"]["messages"][-1]["content"] + matched_response["request"].append(cur_req) + matched_response["response"].append(cur_resp) break return matched_response - -if __name__ == '__main__': + +if __name__ == "__main__": client = OpenAI( - api_key='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', + api_key="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", ) - batch_id = 'batch_hGsLl1EwGbyPO3PatxFX19Mj' + batch_id = "batch_hGsLl1EwGbyPO3PatxFX19Mj" print(match_response_to_request(batch_id, client)) - diff --git a/simplifine_alpha/train_engine.py b/simplifine_alpha/train_engine.py index bc1a37f..3183aec 100644 --- a/simplifine_alpha/train_engine.py +++ b/simplifine_alpha/train_engine.py @@ -1,25 +1,32 @@ -''' - Simplfine is an easy-to-use, open-source library for fine-tuning LLMs models quickly on your own hardware or cloud. - Copyright (C) 2024 Simplifine Corp. - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see . -''' +""" +Simplfine is an easy-to-use, open-source library for fine-tuning LLMs models quickly on your own hardware or cloud. +Copyright (C) 2024 Simplifine Corp. + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +""" + from torch.optim import AdamW from transformers import get_scheduler import torch from tqdm.auto import tqdm -from transformers import AutoTokenizer, AutoModelForCausalLM, TrainingArguments, Trainer, DataCollatorForLanguageModeling +from transformers import ( + AutoTokenizer, + AutoModelForCausalLM, + TrainingArguments, + Trainer, + DataCollatorForLanguageModeling, +) from torch.utils.data import DataLoader from transformers import AutoModelForSequenceClassification from torch.utils.data import DataLoader @@ -33,7 +40,11 @@ ) from sentence_transformers.util import cos_sim from datasets import load_dataset, concatenate_datasets, DatasetDict -from sentence_transformers.losses import MatryoshkaLoss, MultipleNegativesRankingLoss, ContrastiveLoss +from sentence_transformers.losses import ( + MatryoshkaLoss, + MultipleNegativesRankingLoss, + ContrastiveLoss, +) from sentence_transformers import SentenceTransformerTrainingArguments from sentence_transformers.training_args import BatchSamplers from sentence_transformers import SentenceTransformerTrainingArguments @@ -52,10 +63,9 @@ @dataclass class wandbConfig: - wandb_api_key:str - project:str - config:dict - + wandb_api_key: str + project: str + config: dict def init_device(): @@ -85,22 +95,29 @@ def init_device(): device_name = None if torch.backends.mps.is_available(): - print('Using MPS') + print("Using MPS") device = torch.device("mps") - device_name = 'mps' + device_name = "mps" elif torch.cuda.is_available(): - print('Using CUDA') + print("Using CUDA") device = torch.device("cuda") - device_name = 'cuda' + device_name = "cuda" else: - print('Using CPU') + print("Using CPU") device = torch.device("cpu") - device_name = 'cpu' + device_name = "cpu" return device, device_name -def hf_finetune_llm_qa(model_name:str, dataset_name:str='', - from_hf:bool=False, queries:list=[], context:list=[], answers:list=[], hf_token=''): +def hf_finetune_llm_qa( + model_name: str, + dataset_name: str = "", + from_hf: bool = False, + queries: list = [], + context: list = [], + answers: list = [], + hf_token="", +): """ Fine-tunes a language model for question answering tasks. @@ -137,58 +154,66 @@ def hf_finetune_llm_qa(model_name:str, dataset_name:str='', if from_hf: pass else: - dataset = Dataset.from_dict({'questions':queries, 'contexts':context, 'answers':answers}) + dataset = Dataset.from_dict( + {"questions": queries, "contexts": context, "answers": answers} + ) dataset = dataset.train_test_split(0.2) - + def formatting_prompts_func(example): output_texts = [] - for i in range(len(example['questions'])): - text = f'''### Question: {example['questions'][i]}\n ### Context: {example['contexts'][i]}, ### Answer: {example['answers'][i]}''' + for i in range(len(example["questions"])): + text = f"""### Question: {example['questions'][i]}\n ### Context: {example['contexts'][i]}, ### Answer: {example['answers'][i]}""" output_texts.append(text) return output_texts - + response_template = "### Answer:" collator = DataCollatorForCompletionOnlyLM(response_template, tokenizer=tokenizer) default_args = { - "output_dir": "tmp", - "num_train_epochs": 1, - "log_level": "error", - "report_to": "none", + "output_dir": "tmp", + "num_train_epochs": 1, + "log_level": "error", + "report_to": "none", } training_args = TrainingArguments( - per_device_train_batch_size=1, - gradient_accumulation_steps=4, - eval_steps=10, - logging_steps=10, - **default_args) - + per_device_train_batch_size=1, + gradient_accumulation_steps=4, + eval_steps=10, + logging_steps=10, + **default_args, + ) + trainer = SFTTrainer( - model, - max_seq_length=256, - train_dataset=dataset['train'], - eval_dataset=dataset['test'], - args=training_args, - formatting_func=formatting_prompts_func, - data_collator=collator + model, + max_seq_length=256, + train_dataset=dataset["train"], + eval_dataset=dataset["test"], + args=training_args, + formatting_func=formatting_prompts_func, + data_collator=collator, ) - print('\n######################## EVAL PRE-TRAIN ########################\n') + print("\n######################## EVAL PRE-TRAIN ########################\n") print(trainer.evaluate()) - print('\n######################## TRAIN ########################\n') + print("\n######################## TRAIN ########################\n") trainer.train() - print('\n######################## EVAL POST-TRAIN ########################\n') + print("\n######################## EVAL POST-TRAIN ########################\n") print(trainer.evaluate()) - # TODO: attend to this -def hf_finetune_embedder_contrastive(model_name:str, dataset_name:str='', - queries:list=[], positives:list=[], negatives:list=[], - test_split_ratio:float=0.2, - from_hf:bool=False, use_matryoshka:bool=False, - matryoshka_dimensions:list=[], - eval_type:str='ir'): +def hf_finetune_embedder_contrastive( + model_name: str, + dataset_name: str = "", + queries: list = [], + positives: list = [], + negatives: list = [], + test_split_ratio: float = 0.2, + from_hf: bool = False, + use_matryoshka: bool = False, + matryoshka_dimensions: list = [], + eval_type: str = "ir", +): """ Fine-tunes a sentence transformer model using contrastive loss for embedding tasks. @@ -212,10 +237,10 @@ def hf_finetune_embedder_contrastive(model_name:str, dataset_name:str='', Example: >>> hf_finetune_embedder_contrastive( - model_name="sentence-transformers/all-MiniLM-L6-v2", - queries=["What is AI?"], - positives=["AI is the simulation of human intelligence processes by machines."], - negatives=["The weather is nice today."], + model_name="sentence-transformers/all-MiniLM-L6-v2", + queries=["What is AI?"], + positives=["AI is the simulation of human intelligence processes by machines."], + negatives=["The weather is nice today."], test_split_ratio=0.2 ) @@ -236,23 +261,20 @@ def hf_finetune_embedder_contrastive(model_name:str, dataset_name:str='', "label": [1, 0], }) """ - + device_name = None device, deivce_name = init_device() # check if the model is available on the GPU, then implement flash attention 2 - if device_name == 'cuda': + if device_name == "cuda": model = SentenceTransformer( - model_name, - model_kwargs={"attn_implementation": "sdpa"} - ,trust_remote_code=True + model_name, + model_kwargs={"attn_implementation": "sdpa"}, + trust_remote_code=True, ) else: - model = SentenceTransformer( - model_name, device=device - ,trust_remote_code=True - ) - + model = SentenceTransformer(model_name, device=device, trust_remote_code=True) + if from_hf: pass else: @@ -269,30 +291,31 @@ def hf_finetune_embedder_contrastive(model_name:str, dataset_name:str='', # "sentence2": ["It's so sunny.", "She walked to the store."], # "label": [1, 0], # }) - + training_data = { - "sentence1": queries*2, + "sentence1": queries * 2, "sentence2": positives + negatives, - "label": [1]*len(positives) + [0]*len(negatives) + "label": [1] * len(positives) + [0] * len(negatives), } - + dataset = Dataset.from_dict(training_data).shuffle(seed=22) dataset = dataset.train_test_split(test_split_ratio) - dataset_train = dataset['train'] - dataset_test = dataset['test'] - - + dataset_train = dataset["train"] + dataset_test = dataset["test"] # seperate dics for IR eval - corpus = {'ids':range(len(positives)+len(negatives)), 'docs':positives+negatives} - queries = {'ids':range(len(queries)), 'docs':queries} - queries = dict(zip(queries['ids'], queries['docs'])) - corpus = dict(zip(corpus['ids'], corpus['docs'])) + corpus = { + "ids": range(len(positives) + len(negatives)), + "docs": positives + negatives, + } + queries = {"ids": range(len(queries)), "docs": queries} + queries = dict(zip(queries["ids"], queries["docs"])) + corpus = dict(zip(corpus["ids"], corpus["docs"])) relevant_docs = {} # Query ID to relevant documents (qid => set([relevant_cids]) for q_id in queries: relevant_docs[q_id] = [q_id] - + evaluator_ir = InformationRetrievalEvaluator( queries=queries, corpus=corpus, @@ -316,61 +339,66 @@ def hf_finetune_embedder_contrastive(model_name:str, dataset_name:str='', matryoshka_evaluators.append(ir_evaluator) evaluator_matryoshka = SequentialEvaluator(matryoshka_evaluators) training_loss = MatryoshkaLoss( - model, ContrastiveLoss(model), matryoshka_dims=matryoshka_dimensions - ) + model, ContrastiveLoss(model), matryoshka_dims=matryoshka_dimensions + ) else: training_loss = ContrastiveLoss(model) - if device_name == 'mps': - optim = 'adamw_torch' + if device_name == "mps": + optim = "adamw_torch" else: - optim = 'adamw_torch_fused' + optim = "adamw_torch_fused" # define training arguments args = SentenceTransformerTrainingArguments( - output_dir="bge-base-financial-matryoshka", # output directory and hugging face model ID - num_train_epochs=4, # number of epochs - per_device_train_batch_size=32, # train batch size - gradient_accumulation_steps=16, # for a global batch size of 512 - per_device_eval_batch_size=16, # evaluation batch size - warmup_ratio=0.1, # warmup ratio - learning_rate=2e-5, # learning rate, 2e-5 is a good value - lr_scheduler_type="cosine", # use constant learning rate scheduler - optim=optim, # use fused adamw optimizer - tf32=False, # use tf32 precision - bf16=False, # use bf16 precision - fp16=False, - batch_sampler=BatchSamplers.NO_DUPLICATES, # MultipleNegativesRankingLoss benefits from no duplicate samples in a batch - eval_strategy="epoch", # evaluate after each epoch - save_strategy="epoch", # save after each epoch - logging_steps=10, # log every 10 steps - save_total_limit=3, # save only the last 3 models + output_dir="bge-base-financial-matryoshka", # output directory and hugging face model ID + num_train_epochs=4, # number of epochs + per_device_train_batch_size=32, # train batch size + gradient_accumulation_steps=16, # for a global batch size of 512 + per_device_eval_batch_size=16, # evaluation batch size + warmup_ratio=0.1, # warmup ratio + learning_rate=2e-5, # learning rate, 2e-5 is a good value + lr_scheduler_type="cosine", # use constant learning rate scheduler + optim=optim, # use fused adamw optimizer + tf32=False, # use tf32 precision + bf16=False, # use bf16 precision + fp16=False, + batch_sampler=BatchSamplers.NO_DUPLICATES, # MultipleNegativesRankingLoss benefits from no duplicate samples in a batch + eval_strategy="epoch", # evaluate after each epoch + save_strategy="epoch", # save after each epoch + logging_steps=10, # log every 10 steps + save_total_limit=3, # save only the last 3 models ) - + if use_matryoshka: evaluator = evaluator_matryoshka else: evaluator = evaluator_ir trainer = SentenceTransformerTrainer( - model=model, # bg-base-en-v1 - args=args, # training arguments - train_dataset=dataset_train, - loss=training_loss, - evaluator=evaluator, + model=model, # bg-base-en-v1 + args=args, # training arguments + train_dataset=dataset_train, + loss=training_loss, + evaluator=evaluator, ) trainer.train() # TODO: make the eval type more dynamic -def hf_finetune_embedder_positive(model_name:str, dataset_name:str='', - questions:list=[], docs:list=[], - from_hf:bool=False, use_matryoshka:bool=False, - matryoshka_dimensions:list=[], - relevant_ids:list=[], - train_split_ratio:float=0.2, - do_split:bool=False,): +def hf_finetune_embedder_positive( + model_name: str, + dataset_name: str = "", + questions: list = [], + docs: list = [], + from_hf: bool = False, + use_matryoshka: bool = False, + matryoshka_dimensions: list = [], + relevant_ids: list = [], + train_split_ratio: float = 0.2, + do_split: bool = False, +): """ Fine-tunes a SentenceTransformer model for embedding positive examples. @@ -389,43 +417,44 @@ def hf_finetune_embedder_positive(model_name:str, dataset_name:str='', Returns: None """ - + device_name = None device, deivce_name = init_device() # check if the model is available on the GPU, then implemenbt flash attention 2 if torch.cuda.is_available(): model = SentenceTransformer( - model_name, - model_kwargs={"attn_implementation": "sdpa"} - ,trust_remote_code=True + model_name, + model_kwargs={"attn_implementation": "sdpa"}, + trust_remote_code=True, ) else: - model = SentenceTransformer( - model_name, device=device - ,trust_remote_code=True - ) - + model = SentenceTransformer(model_name, device=device, trust_remote_code=True) + if from_hf: pass else: if do_split: - split_index = int(len(questions)*train_split_ratio) - questions, questions_eval = questions[:-split_index], questions[-split_index:] - relevant_ids, relevant_ids_eval = relevant_ids[:-split_index], relevant_ids[-split_index:] + split_index = int(len(questions) * train_split_ratio) + questions, questions_eval = ( + questions[:-split_index], + questions[-split_index:], + ) + relevant_ids, relevant_ids_eval = ( + relevant_ids[:-split_index], + relevant_ids[-split_index:], + ) - dataset_q = Dataset.from_dict({'queries':questions_eval}) - dataset_c = Dataset.from_dict({'docs':docs}) + dataset_q = Dataset.from_dict({"queries": questions_eval}) + dataset_c = Dataset.from_dict({"docs": docs}) dataset_q = dataset_q.add_column("id", range(len(dataset_q))) dataset_c = dataset_c.add_column("id", range(len(dataset_c))) else: - dataset_q = Dataset.from_dict({'queries':questions}) - dataset_c = Dataset.from_dict({'docs':docs}) + dataset_q = Dataset.from_dict({"queries": questions}) + dataset_c = Dataset.from_dict({"docs": docs}) dataset_q = dataset_q.add_column("id", range(len(dataset_q))) dataset_c = dataset_c.add_column("id", range(len(dataset_c))) - - # Convert the datasets to dictionaries corpus = dict( zip(dataset_c["id"], dataset_c["docs"]) @@ -435,14 +464,16 @@ def hf_finetune_embedder_positive(model_name:str, dataset_name:str='', ) # Our queries (qid => question if do_split: - relevant_docs = {} # Query ID to relevant documents (qid => set([relevant_cids]) - for num,q_id in enumerate(queries): + relevant_docs = ( + {} + ) # Query ID to relevant documents (qid => set([relevant_cids]) + for num, q_id in enumerate(queries): relevant_docs[q_id] = [relevant_ids_eval[num]] else: relevant_docs = {} - for num,q_id in enumerate(queries): + for num, q_id in enumerate(queries): relevant_docs[q_id] = [relevant_ids[num]] - + evaluator_ir = InformationRetrievalEvaluator( queries=queries, corpus=corpus, @@ -466,66 +497,70 @@ def hf_finetune_embedder_positive(model_name:str, dataset_name:str='', matryoshka_evaluators.append(ir_evaluator) evaluator_matryoshka = SequentialEvaluator(matryoshka_evaluators) training_loss = MatryoshkaLoss( - model, MultipleNegativesRankingLoss(model), matryoshka_dims=matryoshka_dimensions - ) + model, + MultipleNegativesRankingLoss(model), + matryoshka_dims=matryoshka_dimensions, + ) else: training_loss = MultipleNegativesRankingLoss(model) - if device_name == 'mps': - optim = 'adamw_torch' + if device_name == "mps": + optim = "adamw_torch" else: - optim = 'adamw_torch_fused' + optim = "adamw_torch_fused" # define training arguments args = SentenceTransformerTrainingArguments( - output_dir="bge-base-financial-matryoshka", # output directory and hugging face model ID - num_train_epochs=4, # number of epochs - per_device_train_batch_size=32, # train batch size - gradient_accumulation_steps=16, # for a global batch size of 512 - per_device_eval_batch_size=16, # evaluation batch size - warmup_ratio=0.1, # warmup ratio - learning_rate=2e-5, # learning rate, 2e-5 is a good value - lr_scheduler_type="cosine", # use constant learning rate scheduler - optim=optim, # use fused adamw optimizer - tf32=False, # use tf32 precision - bf16=False, # use bf16 precision - fp16=False, - batch_sampler=BatchSamplers.NO_DUPLICATES, # MultipleNegativesRankingLoss benefits from no duplicate samples in a batch - eval_strategy="epoch", # evaluate after each epoch - save_strategy="epoch", # save after each epoch - logging_steps=10, # log every 10 steps - save_total_limit=3, # save only the last 3 models + output_dir="bge-base-financial-matryoshka", # output directory and hugging face model ID + num_train_epochs=4, # number of epochs + per_device_train_batch_size=32, # train batch size + gradient_accumulation_steps=16, # for a global batch size of 512 + per_device_eval_batch_size=16, # evaluation batch size + warmup_ratio=0.1, # warmup ratio + learning_rate=2e-5, # learning rate, 2e-5 is a good value + lr_scheduler_type="cosine", # use constant learning rate scheduler + optim=optim, # use fused adamw optimizer + tf32=False, # use tf32 precision + bf16=False, # use bf16 precision + fp16=False, + batch_sampler=BatchSamplers.NO_DUPLICATES, # MultipleNegativesRankingLoss benefits from no duplicate samples in a batch + eval_strategy="epoch", # evaluate after each epoch + save_strategy="epoch", # save after each epoch + logging_steps=10, # log every 10 steps + save_total_limit=3, # save only the last 3 models ) - train_dataset = Dataset.from_dict({'anchor':questions, 'positive':[docs[i] for i in relevant_ids]}) - print('\n-----------------------------------------\n') + train_dataset = Dataset.from_dict( + {"anchor": questions, "positive": [docs[i] for i in relevant_ids]} + ) + print("\n-----------------------------------------\n") print(len(questions)) print(len(questions_eval)) - print('\n-----------------------------------------\n') - + print("\n-----------------------------------------\n") + if use_matryoshka: evaluator = evaluator_matryoshka else: evaluator = evaluator_ir trainer = SentenceTransformerTrainer( - model=model, # bg-base-en-v1 - args=args, # training arguments - train_dataset=train_dataset.select_columns( - ["positive", "anchor"] - ), # training dataset - loss=training_loss, - evaluator=evaluator, + model=model, # bg-base-en-v1 + args=args, # training arguments + train_dataset=train_dataset.select_columns( + ["positive", "anchor"] + ), # training dataset + loss=training_loss, + evaluator=evaluator, ) - print('\n######################## EVAL PRE-TRAIN ########################\n') + print("\n######################## EVAL PRE-TRAIN ########################\n") pre_train_metrics = evaluator(model) print(pre_train_metrics) trainer.train() - print('\n######################## EVAL POST-TRAIN ########################\n') + print("\n######################## EVAL POST-TRAIN ########################\n") post_train_metrics = evaluator(model) print(post_train_metrics) - + def cleanup(): """ function to cleanup the distributed process group. @@ -552,11 +587,22 @@ class sftPromptConfig(PromptConfig): def sft_train( - model_name:str, dataset_name:str=None, hf_token:str='', dataset_config_name:str=None, data_from_hf:bool=True, - do_split:bool=True, split_ratio:float=0.2, use_peft:bool=False, lora_config:LoraConfig=None, - sft_config:SFTConfig=None, data:dict={}, wandb_config:wandbConfig=None, - use_ddp:bool=False, use_zero:bool=True, sft_prompt_config:sftPromptConfig=None -): + model_name: str, + dataset_name: str = None, + hf_token: str = "", + dataset_config_name: str = None, + data_from_hf: bool = True, + do_split: bool = True, + split_ratio: float = 0.2, + use_peft: bool = False, + lora_config: LoraConfig = None, + sft_config: SFTConfig = None, + data: dict = {}, + wandb_config: wandbConfig = None, + use_ddp: bool = False, + use_zero: bool = True, + sft_prompt_config: sftPromptConfig = None, +): """ Train a model using the Supervised Finetuning (SFT) process. @@ -613,35 +659,39 @@ def sft_train( if dist.is_initialized(): print("Destroying existing process group") dist.destroy_process_group() - + if use_ddp and use_zero: raise ValueError("Only one dist method is accepted at once.") - + if use_ddp: sft_config.deepspeed = None - + if response_template not in template: - raise ValueError('The response template must be in the template') + raise ValueError("The response template must be in the template") if system_message_key and system_message: - raise ValueError('Only provide key from dataset or system message as a string, not both') - + raise ValueError( + "Only provide key from dataset or system message as a string, not both" + ) + if sft_config is None: - raise ValueError('SFT config must be provided') - + raise ValueError("SFT config must be provided") + if data_from_hf and not dataset_name: - raise ValueError('Dataset name must be provided if data is from Hugging Face') - + raise ValueError("Dataset name must be provided if data is from Hugging Face") + # initialize the training arguments if sft_config is None: - raise ValueError('SFT config must be provided') - + raise ValueError("SFT config must be provided") + if use_ddp and sft_config.gradient_checkpointing: - print('[WARNING]: Gradient checkpointing is not supported with DDP. Disabling gradient checkpointing.') + print( + "[WARNING]: Gradient checkpointing is not supported with DDP. Disabling gradient checkpointing." + ) sft_config.gradient_checkpointing = False # initialize the tokenizer - tokenizer = AutoTokenizer.from_pretrained(model_name, token = hf_token) + tokenizer = AutoTokenizer.from_pretrained(model_name, token=hf_token) if tokenizer.pad_token is None: if new_padding_token: tokenizer.pad_token = new_padding_token @@ -652,59 +702,59 @@ def sft_train( # replacing the response template, with chat/instruction based tokens. # tokenizing response tempaltes in context can be different to ones without it if use_chat_template: - if tokenizer.chat_template: - # dummy messages to extract chat tokens - messages = [ - { - "role": "system", - "content": "You are a helpful assistant." - }, - { - "role": "user", - "content": "Who won the world series in 2020?" - }, - { - "role": "assistant", - "content": "The Los Angeles Dodgers won the World Series in 2020." - } - ] - text = tokenizer.apply_chat_template(messages, tokenize=False) - chat_temp = text.split(messages[1]['content'])[-1].split(messages[-1]['content'])[0] - chat_response_temp = chat_temp.replace(tokenizer.eos_token,'') - else: - raise ValueError('Tokenizer does not have chat template') - - sft_config.remove_unused_columns=False + if tokenizer.chat_template: + # dummy messages to extract chat tokens + messages = [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "Who won the world series in 2020?"}, + { + "role": "assistant", + "content": "The Los Angeles Dodgers won the World Series in 2020.", + }, + ] + text = tokenizer.apply_chat_template(messages, tokenize=False) + chat_temp = text.split(messages[1]["content"])[-1].split( + messages[-1]["content"] + )[0] + chat_response_temp = chat_temp.replace(tokenizer.eos_token, "") + else: + raise ValueError("Tokenizer does not have chat template") + + sft_config.remove_unused_columns = False if data_from_hf: try: if dataset_config_name: - raw_datasets = load_dataset(dataset_name, dataset_config_name, token=False, split='train') + raw_datasets = load_dataset( + dataset_name, dataset_config_name, token=False, split="train" + ) else: - raw_datasets = load_dataset(dataset_name, token=False, split='train') + raw_datasets = load_dataset(dataset_name, token=False, split="train") except Exception as e: - print(f'Error: {e}') + print(f"Error: {e}") if dataset_config_name: - raw_datasets = load_dataset(dataset_name, dataset_config_name, token=False) + raw_datasets = load_dataset( + dataset_name, dataset_config_name, token=False + ) else: raw_datasets = load_dataset(dataset_name, token=False) - raw_datasets = raw_datasets['train'] + raw_datasets = raw_datasets["train"] if do_split: raw_datasets = raw_datasets.train_test_split(split_ratio) else: raw_datasets = Dataset.from_dict(data) if do_split: raw_datasets = raw_datasets.train_test_split(split_ratio) - + # function to format the prompts def formatting_prompts_func(example): output_texts = [] - + if not tokenizer.chat_template: - for i in range(len(example[keys[0]])): - formatted_text = template.format( - **{key: example[key][i] for key in keys} - ) - output_texts.append(formatted_text) + for i in range(len(example[keys[0]])): + formatted_text = template.format( + **{key: example[key][i] for key in keys} + ) + output_texts.append(formatted_text) else: for i in range(len(example[keys[0]])): formatted_text = template.format( @@ -720,39 +770,63 @@ def formatting_prompts_func(example): messages = [ {"role": "user", "content": user_text}, {"role": "assistant", "content": assistant_text}, - {"role": "system", "content": system_message_text} + {"role": "system", "content": system_message_text}, ] else: messages = [ {"role": "user", "content": user_text}, {"role": "assistant", "content": assistant_text}, ] - chat_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=False) + chat_prompt = tokenizer.apply_chat_template( + messages, tokenize=False, add_generation_prompt=False + ) output_texts.append(chat_prompt) - + if not all(isinstance(text, str) for text in output_texts): raise ValueError("Formatted text must be a list of strings") - + # Ensure that there's at least one text to process if not output_texts: - return {'input_ids': [], 'attention_mask': []} - - tokenized_output = tokenizer(output_texts, truncation=True, padding='max_length', add_special_tokens=True, max_length=tokenizer.model_max_length) + return {"input_ids": [], "attention_mask": []} + + tokenized_output = tokenizer( + output_texts, + truncation=True, + padding="max_length", + add_special_tokens=True, + max_length=tokenizer.model_max_length, + ) return tokenized_output if tokenizer.chat_template: - collator = DataCollatorForCompletionOnlyLM(chat_response_temp, tokenizer=tokenizer) + collator = DataCollatorForCompletionOnlyLM( + chat_response_temp, tokenizer=tokenizer + ) else: - collator = DataCollatorForCompletionOnlyLM(response_template, tokenizer=tokenizer) - - promptTokenizedDataset = raw_datasets.map(formatting_prompts_func, batched=True, remove_columns=raw_datasets['train'].column_names) + collator = DataCollatorForCompletionOnlyLM( + response_template, tokenizer=tokenizer + ) + + promptTokenizedDataset = raw_datasets.map( + formatting_prompts_func, + batched=True, + remove_columns=raw_datasets["train"].column_names, + ) promptTokenizedDataset = promptTokenizedDataset.shuffle(len(promptTokenizedDataset)) # initialize the peft config if use_peft: if lora_config is None: - target_modules = ["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj",] + target_modules = [ + "q_proj", + "k_proj", + "v_proj", + "o_proj", + "gate_proj", + "up_proj", + "down_proj", + ] lora_config = LoraConfig( r=16, @@ -762,28 +836,26 @@ def formatting_prompts_func(example): task_type="CAUSAL_LM", target_modules=target_modules, ) - + # initialize the model device, device_name = init_device() - if device_name == 'cuda': - model = AutoModelForCausalLM.from_pretrained(model_name, - token = hf_token) + if device_name == "cuda": + model = AutoModelForCausalLM.from_pretrained(model_name, token=hf_token) else: - model = AutoModelForCausalLM.from_pretrained(model_name, - token = hf_token) + model = AutoModelForCausalLM.from_pretrained(model_name, token=hf_token) model.resize_token_embeddings(len(tokenizer)) # if peft is enabled, use the peft model if use_peft: model = get_peft_model(model, peft_config=lora_config) - + if sft_config.deepspeed is None and use_zero: - raise ValueError('Zero optimization requires a DeepSpeed config') - + raise ValueError("Zero optimization requires a DeepSpeed config") + _is_distritubed = False if torch.cuda.device_count() > 1: if use_ddp and use_zero: - raise ValueError('Zero optimization and DDP cannot be used together') + raise ValueError("Zero optimization and DDP cannot be used together") if use_ddp: _is_distritubed = True if not dist.is_initialized(): @@ -792,7 +864,7 @@ def formatting_prompts_func(example): else: print("Process group already initialized") - sft_config.ddp_find_unused_parameters=False + sft_config.ddp_find_unused_parameters = False rank = dist.get_rank() device_id = rank % torch.cuda.device_count() model = model.to(device_id) @@ -804,40 +876,42 @@ def formatting_prompts_func(example): dist.init_process_group("nccl", world_size=torch.cuda.device_count()) else: print("Process group already initialized") - print('using ZeRO optimization') + print("using ZeRO optimization") else: # user warning not utilizing DDP or ZeRO for multi-gpu setup - print('[WARNING]: multiple GPUs detected, but not using DDP or ZeRO') + print("[WARNING]: multiple GPUs detected, but not using DDP or ZeRO") else: model.to(device) distributed = False - if device_name == 'mps': + if device_name == "mps": sft_config.fp16 = False sft_config.bf16 = False - - print(f"train data set is: {promptTokenizedDataset['train']}, eval dataset is {promptTokenizedDataset['test']}") + + print( + f"train data set is: {promptTokenizedDataset['train']}, eval dataset is {promptTokenizedDataset['test']}" + ) if do_split: trainer = SFTTrainer( - model, - tokenizer=tokenizer, - train_dataset=promptTokenizedDataset['train'], - eval_dataset=promptTokenizedDataset['test'], - args=sft_config, - formatting_func=formatting_prompts_func, - data_collator=collator + model, + tokenizer=tokenizer, + train_dataset=promptTokenizedDataset["train"], + eval_dataset=promptTokenizedDataset["test"], + args=sft_config, + formatting_func=formatting_prompts_func, + data_collator=collator, ) else: trainer = SFTTrainer( - model, - tokenizer=tokenizer, - train_dataset=promptTokenizedDataset, - args=sft_config, - formatting_func=formatting_prompts_func, - data_collator=collator + model, + tokenizer=tokenizer, + train_dataset=promptTokenizedDataset, + args=sft_config, + formatting_func=formatting_prompts_func, + data_collator=collator, ) # creating a directory in ouput dir for final model saving - output_dir_final = os.path.join(sft_config.output_dir, 'final_model') + output_dir_final = os.path.join(sft_config.output_dir, "final_model") if not os.path.exists(output_dir_final): os.makedirs(output_dir_final, exist_ok=True) @@ -846,16 +920,42 @@ def formatting_prompts_func(example): if _is_distritubed: dist.destroy_process_group() - - -def hf_sft(model_name:str, dataset_name:str='nlpie/pandemic_pact', - keys:list=[], template:str='', do_split:bool=True, split_ratio:float=0.2, load_eval_from_data:bool=False, - data:dict={}, num_epochs:int=3, batch_size:int=1, wandb_api_key:str='', - lr:float=5e-5, from_hf:bool=True, response_template:str='### Answer:', eval_steps:int=10, logging_steps:int=10, - use_peft:bool=False, peft_config=None, ddp:bool=False, zero:bool=True, deepspeed_config:str='home/ubuntu/src/zero_config.json', - hf_token:str='', gradient_accumulation_steps:int=1, fp16:bool=False, bf16:bool=False, report_to:str='none', - gradient_checkpointing:bool=False, max_seq_length:int=2048, use_wandb:bool=False, output_dir:str='sft_output', eval_accumulation_steps:int=8, wandb_config:wandbConfig=None): - + + +def hf_sft( + model_name: str, + dataset_name: str = "nlpie/pandemic_pact", + keys: list = [], + template: str = "", + do_split: bool = True, + split_ratio: float = 0.2, + load_eval_from_data: bool = False, + data: dict = {}, + num_epochs: int = 3, + batch_size: int = 1, + wandb_api_key: str = "", + lr: float = 5e-5, + from_hf: bool = True, + response_template: str = "### Answer:", + eval_steps: int = 10, + logging_steps: int = 10, + use_peft: bool = False, + peft_config=None, + ddp: bool = False, + zero: bool = True, + deepspeed_config: str = "home/ubuntu/src/zero_config.json", + hf_token: str = "", + gradient_accumulation_steps: int = 1, + fp16: bool = False, + bf16: bool = False, + report_to: str = "none", + gradient_checkpointing: bool = False, + max_seq_length: int = 2048, + use_wandb: bool = False, + output_dir: str = "sft_output", + eval_accumulation_steps: int = 8, + wandb_config: wandbConfig = None, +): """ Execute the SFT (Supervised Finetuning) process using Hugging Face Transformers. @@ -902,31 +1002,31 @@ def hf_sft(model_name:str, dataset_name:str='nlpie/pandemic_pact', - This function initializes a tokenizer, configures SFT parameters, loads the dataset, initializes the model, and starts training using the SFTTrainer. - If DDP and Zero optimization are enabled, they cannot be used simultaneously due to conflicting configurations. """ - + # Ensure no default process group exists if dist.is_initialized(): print("Destroying existing process group") dist.destroy_process_group() - + # deepspeed config, fix this for the time being - deepspeed_config='home/ubuntu/src/zero_config.json' - + deepspeed_config = "home/ubuntu/src/zero_config.json" + if response_template not in template: - raise ValueError('The response template must be in the template') - + raise ValueError("The response template must be in the template") + # script_path = os.path.dirname(os.path.realpath(__file__)) # output_dir = os.path.join(script_path, output_dir) - + # init wandb - if report_to == 'wandb': + if report_to == "wandb": wandb_api_key = wandb_config.api_key project = wandb_config.project config = wandb_config.config wandb.login(key=wandb_api_key) wandb.init(project=project, config=config) - + # initialize the tokenizer - tokenizer = AutoTokenizer.from_pretrained(model_name, token = hf_token) + tokenizer = AutoTokenizer.from_pretrained(model_name, token=hf_token) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token tokenizer.padding_side = "right" @@ -936,31 +1036,26 @@ def hf_sft(model_name:str, dataset_name:str='nlpie/pandemic_pact', if tokenizer.chat_template: # dummy messages to extract chat tokens messages = [ - { - "role": "system", - "content": "You are a helpful assistant." - }, - { - "role": "user", - "content": "Who won the world series in 2020?" - }, + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "Who won the world series in 2020?"}, { "role": "assistant", - "content": "The Los Angeles Dodgers won the World Series in 2020." - } - ] + "content": "The Los Angeles Dodgers won the World Series in 2020.", + }, + ] text = tokenizer.apply_chat_template(messages, tokenize=False) - chat_temp = text.split(messages[1]['content'])[-1].split(messages[-1]['content'])[0] - chat_response_temp = chat_temp.replace(tokenizer.eos_token,'') - + chat_temp = text.split(messages[1]["content"])[-1].split( + messages[-1]["content"] + )[0] + chat_response_temp = chat_temp.replace(tokenizer.eos_token, "") if from_hf: try: - raw_datasets = load_dataset(dataset_name, token=False, split='train') + raw_datasets = load_dataset(dataset_name, token=False, split="train") except Exception as e: - print(f'Error: {e}') + print(f"Error: {e}") raw_datasets = load_dataset(dataset_name, token=False) - raw_datasets = raw_datasets['train'] + raw_datasets = raw_datasets["train"] if do_split: raw_datasets = raw_datasets.train_test_split(split_ratio) else: @@ -970,13 +1065,13 @@ def hf_sft(model_name:str, dataset_name:str='nlpie/pandemic_pact', def formatting_prompts_func(example): output_texts = [] - + if not tokenizer.chat_template: - for i in range(len(example[keys[0]])): - formatted_text = template.format( - **{key: example[key][i] for key in keys} - ) - output_texts.append(formatted_text) + for i in range(len(example[keys[0]])): + formatted_text = template.format( + **{key: example[key][i] for key in keys} + ) + output_texts.append(formatted_text) else: for i in range(len(example[keys[0]])): formatted_text = template.format( @@ -988,32 +1083,52 @@ def formatting_prompts_func(example): {"role": "user", "content": user_text}, {"role": "assistant", "content": assistant_text}, ] - chat_prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=False) + chat_prompt = tokenizer.apply_chat_template( + messages, tokenize=False, add_generation_prompt=False + ) output_texts.append(chat_prompt) - + if not all(isinstance(text, str) for text in output_texts): raise ValueError("Formatted text must be a list of strings") - + # Ensure that there's at least one text to process if not output_texts: - return {'input_ids': [], 'attention_mask': []} - - tokenized_output = tokenizer(output_texts, truncation=True, padding='max_length', add_special_tokens=True) + return {"input_ids": [], "attention_mask": []} + + tokenized_output = tokenizer( + output_texts, truncation=True, padding="max_length", add_special_tokens=True + ) return tokenized_output if tokenizer.chat_template: - collator = DataCollatorForCompletionOnlyLM(chat_response_temp, tokenizer=tokenizer) + collator = DataCollatorForCompletionOnlyLM( + chat_response_temp, tokenizer=tokenizer + ) else: - collator = DataCollatorForCompletionOnlyLM(response_template, tokenizer=tokenizer) - - promptTokenizedDataset = raw_datasets.map(formatting_prompts_func, batched=True, remove_columns=raw_datasets['train'].column_names) + collator = DataCollatorForCompletionOnlyLM( + response_template, tokenizer=tokenizer + ) + + promptTokenizedDataset = raw_datasets.map( + formatting_prompts_func, + batched=True, + remove_columns=raw_datasets["train"].column_names, + ) promptTokenizedDataset = promptTokenizedDataset.shuffle(len(promptTokenizedDataset)) # initialize the peft config if use_peft: if not peft_config: - target_modules = ["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj",] + target_modules = [ + "q_proj", + "k_proj", + "v_proj", + "o_proj", + "gate_proj", + "up_proj", + "down_proj", + ] peft_config = LoraConfig( r=16, @@ -1023,185 +1138,202 @@ def formatting_prompts_func(example): task_type="CAUSAL_LM", target_modules=target_modules, ) - + # initialize the model device, device_name = init_device() - if device_name == 'cuda': - model = AutoModelForCausalLM.from_pretrained(model_name, - token = hf_token, - attn_implementation="flash_attention_2", - torch_dtype=torch.float16) + if device_name == "cuda": + model = AutoModelForCausalLM.from_pretrained( + model_name, + token=hf_token, + attn_implementation="flash_attention_2", + torch_dtype=torch.float16, + ) else: - model = AutoModelForCausalLM.from_pretrained(model_name, - token = hf_token, - torch_dtype=torch.float32) + model = AutoModelForCausalLM.from_pretrained( + model_name, token=hf_token, torch_dtype=torch.float32 + ) model.resize_token_embeddings(len(tokenizer)) # if peft is enabled, use the peft model if use_peft: model = get_peft_model(model, peft_config=peft_config) - if torch.cuda.device_count() > 1: if ddp and zero: - raise ValueError('Zero optimization and DDP cannot be used together') + raise ValueError("Zero optimization and DDP cannot be used together") if ddp: if not dist.is_initialized(): print("Initializing process group for DDP") dist.init_process_group("nccl", world_size=torch.cuda.device_count()) else: print("Process group already initialized") - + rank = dist.get_rank() device_id = rank % torch.cuda.device_count() model = model.to(device_id) model = DDP(model, device_ids=[device_id]) distributed = True sft_config = SFTConfig( - output_dir=output_dir, - per_device_train_batch_size = batch_size, - per_device_eval_batch_size = batch_size, - num_train_epochs= num_epochs, - fp16=fp16, - bf16=bf16, - learning_rate=lr, - gradient_accumulation_steps=gradient_accumulation_steps, - gradient_checkpointing=gradient_checkpointing, - max_seq_length = max_seq_length, - report_to=report_to, - remove_unused_columns=False, - eval_steps=eval_steps, - eval_accumulation_steps=eval_accumulation_steps, - evaluation_strategy="steps", - logging_steps=logging_steps - ) + output_dir=output_dir, + per_device_train_batch_size=batch_size, + per_device_eval_batch_size=batch_size, + num_train_epochs=num_epochs, + fp16=fp16, + bf16=bf16, + learning_rate=lr, + gradient_accumulation_steps=gradient_accumulation_steps, + gradient_checkpointing=gradient_checkpointing, + max_seq_length=max_seq_length, + report_to=report_to, + remove_unused_columns=False, + eval_steps=eval_steps, + eval_accumulation_steps=eval_accumulation_steps, + evaluation_strategy="steps", + logging_steps=logging_steps, + ) elif zero: sft_config = SFTConfig( - output_dir="/home/ubuntu/src//tmp", - deepspeed="/home/ubuntu/src/zero_config.json", - per_device_train_batch_size = 1, - per_device_eval_batch_size = 1, - num_train_epochs= 1, - fp16=True, - learning_rate=2e-5, - gradient_accumulation_steps=4, - report_to='none', - gradient_checkpointing=True, - logging_dir="/home/ubuntu/src//chkp-pact", - max_seq_length = max_seq_length, - save_steps=50, - eval_steps=1, - eval_accumulation_steps=eval_accumulation_steps, - evaluation_strategy="steps", - logging_steps=logging_steps) + output_dir="/home/ubuntu/src//tmp", + deepspeed="/home/ubuntu/src/zero_config.json", + per_device_train_batch_size=1, + per_device_eval_batch_size=1, + num_train_epochs=1, + fp16=True, + learning_rate=2e-5, + gradient_accumulation_steps=4, + report_to="none", + gradient_checkpointing=True, + logging_dir="/home/ubuntu/src//chkp-pact", + max_seq_length=max_seq_length, + save_steps=50, + eval_steps=1, + eval_accumulation_steps=eval_accumulation_steps, + evaluation_strategy="steps", + logging_steps=logging_steps, + ) else: sft_config = SFTConfig( - output_dir=output_dir, - per_device_train_batch_size = batch_size, - per_device_eval_batch_size = batch_size, - num_train_epochs= num_epochs, - fp16=fp16, - bf16=bf16, - learning_rate=lr, - gradient_accumulation_steps=gradient_accumulation_steps, - gradient_checkpointing=gradient_checkpointing, - max_seq_length = max_seq_length, - report_to=report_to, - eval_steps=eval_steps, - eval_accumulation_steps=eval_accumulation_steps, - logging_steps=logging_steps, - evaluation_strategy="steps" - ) + output_dir=output_dir, + per_device_train_batch_size=batch_size, + per_device_eval_batch_size=batch_size, + num_train_epochs=num_epochs, + fp16=fp16, + bf16=bf16, + learning_rate=lr, + gradient_accumulation_steps=gradient_accumulation_steps, + gradient_checkpointing=gradient_checkpointing, + max_seq_length=max_seq_length, + report_to=report_to, + eval_steps=eval_steps, + eval_accumulation_steps=eval_accumulation_steps, + logging_steps=logging_steps, + evaluation_strategy="steps", + ) else: model.to(device) distributed = False - if device_name == 'mps': + if device_name == "mps": fp16 = False sft_config = SFTConfig( - output_dir=output_dir, - per_device_train_batch_size = batch_size, - per_device_eval_batch_size = batch_size, - num_train_epochs= num_epochs, - fp16=False, - bf16=False, - learning_rate=lr, - gradient_accumulation_steps=gradient_accumulation_steps, - gradient_checkpointing=gradient_checkpointing, - max_seq_length = max_seq_length, - report_to=report_to, - remove_unused_columns=True, - eval_steps=eval_steps, - eval_accumulation_steps=eval_accumulation_steps, - logging_steps=logging_steps, - evaluation_strategy="steps" - ) - - print(f"train data set is: {promptTokenizedDataset['train']}, eval dataset is {promptTokenizedDataset['test']}") + output_dir=output_dir, + per_device_train_batch_size=batch_size, + per_device_eval_batch_size=batch_size, + num_train_epochs=num_epochs, + fp16=False, + bf16=False, + learning_rate=lr, + gradient_accumulation_steps=gradient_accumulation_steps, + gradient_checkpointing=gradient_checkpointing, + max_seq_length=max_seq_length, + report_to=report_to, + remove_unused_columns=True, + eval_steps=eval_steps, + eval_accumulation_steps=eval_accumulation_steps, + logging_steps=logging_steps, + evaluation_strategy="steps", + ) + + print( + f"train data set is: {promptTokenizedDataset['train']}, eval dataset is {promptTokenizedDataset['test']}" + ) if do_split: trainer = SFTTrainer( - model, - tokenizer=tokenizer, - train_dataset=promptTokenizedDataset['train'], - eval_dataset=promptTokenizedDataset['test'], - args=sft_config, - formatting_func=formatting_prompts_func, - data_collator=collator + model, + tokenizer=tokenizer, + train_dataset=promptTokenizedDataset["train"], + eval_dataset=promptTokenizedDataset["test"], + args=sft_config, + formatting_func=formatting_prompts_func, + data_collator=collator, ) else: trainer = SFTTrainer( - model, - tokenizer=tokenizer, - train_dataset=promptTokenizedDataset, - args=sft_config, - formatting_func=formatting_prompts_func, - data_collator=collator + model, + tokenizer=tokenizer, + train_dataset=promptTokenizedDataset, + args=sft_config, + formatting_func=formatting_prompts_func, + data_collator=collator, ) # creating a directory in ouput dir for final model saving - output_dir_final = os.path.join(output_dir, 'final_model') + output_dir_final = os.path.join(output_dir, "final_model") if not os.path.exists(output_dir_final): os.makedirs(output_dir_final, exist_ok=True) - trainer.train() trainer.save_model(output_dir_final) if ddp: dist.destroy_process_group() + def clm_train( - model_name:str, dataset_name:str=None, hf_token:str='', dataset_config_name:str=None, data_from_hf:bool=True, - do_split:bool=True, split_ratio:float=0.2, use_peft:bool=False, lora_config:LoraConfig=None, - train_args:TrainingArguments=None, data:dict={}, wandb_config:wandbConfig=None, - use_ddp:bool=False, use_zero:bool=True, prompt_config:PromptConfig=None - ): + model_name: str, + dataset_name: str = None, + hf_token: str = "", + dataset_config_name: str = None, + data_from_hf: bool = True, + do_split: bool = True, + split_ratio: float = 0.2, + use_peft: bool = False, + lora_config: LoraConfig = None, + train_args: TrainingArguments = None, + data: dict = {}, + wandb_config: wandbConfig = None, + use_ddp: bool = False, + use_zero: bool = True, + prompt_config: PromptConfig = None, +): # Ensure no default process group exists if dist.is_initialized(): print("Destroying existing process group") dist.destroy_process_group() - + if use_ddp and use_zero: raise ValueError("Only one dist method is accepted at once.") - + if use_ddp: train_args.deepspeed = None if train_args is None: - raise ValueError('SFT config must be provided') - + raise ValueError("SFT config must be provided") + if data_from_hf and not dataset_name: - raise ValueError('Dataset name must be provided if data is from Hugging Face') - + raise ValueError("Dataset name must be provided if data is from Hugging Face") + # initialize the training arguments if train_args is None: - raise ValueError('SFT config must be provided') - + raise ValueError("SFT config must be provided") + if use_ddp and train_args.gradient_checkpointing: - print('[WARNING]: Gradient checkpointing is not supported with DDP. Disabling gradient checkpointing.') + print( + "[WARNING]: Gradient checkpointing is not supported with DDP. Disabling gradient checkpointing." + ) train_args.gradient_checkpointing = False # initialize the tokenizer - tokenizer = AutoTokenizer.from_pretrained(model_name, token = hf_token) + tokenizer = AutoTokenizer.from_pretrained(model_name, token=hf_token) if tokenizer.pad_token is None: if prompt_config.new_padding_token: tokenizer.pad_token = prompt_config.new_padding_token @@ -1209,35 +1341,42 @@ def clm_train( tokenizer.pad_token = tokenizer.eos_token tokenizer.padding_side = "right" - - train_args.remove_unused_columns=False + train_args.remove_unused_columns = False if data_from_hf: try: if dataset_config_name: - raw_datasets = load_dataset(dataset_name, dataset_config_name, token=False, split='train') - raw_datasets = raw_datasets.rename_column(prompt_config.clm_column, 'text') + raw_datasets = load_dataset( + dataset_name, dataset_config_name, token=False, split="train" + ) + raw_datasets = raw_datasets.rename_column( + prompt_config.clm_column, "text" + ) else: - raw_datasets = load_dataset(dataset_name, token=False, split='train') - raw_datasets = raw_datasets.rename_column(prompt_config.clm_column, 'text') + raw_datasets = load_dataset(dataset_name, token=False, split="train") + raw_datasets = raw_datasets.rename_column( + prompt_config.clm_column, "text" + ) except Exception as e: - print(f'Error: {e}') + print(f"Error: {e}") if dataset_config_name: - raw_datasets = load_dataset(dataset_name, dataset_config_name, token=False) + raw_datasets = load_dataset( + dataset_name, dataset_config_name, token=False + ) else: raw_datasets = load_dataset(dataset_name, token=False) - raw_datasets = raw_datasets['train'] - raw_datasets = raw_datasets.rename_column(prompt_config.clm_column, 'text') + raw_datasets = raw_datasets["train"] + raw_datasets = raw_datasets.rename_column(prompt_config.clm_column, "text") if do_split: raw_datasets = raw_datasets.train_test_split(split_ratio) else: raw_datasets = Dataset.from_dict(data) if do_split: raw_datasets = raw_datasets.train_test_split(split_ratio) - + # tokenizing the dataset def tokenize(element): outputs = tokenizer( - element['text'], + element["text"], truncation=True, max_length=prompt_config.context_length, return_overflowing_tokens=True, @@ -1253,21 +1392,28 @@ def tokenize(element): data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False) tokenized_datasets = raw_datasets.map( - tokenize, batched=True, remove_columns=raw_datasets['train'].column_names + tokenize, batched=True, remove_columns=raw_datasets["train"].column_names ) - - if train_args.report_to == ['wandb']: + + if train_args.report_to == ["wandb"]: wandb_api_key = wandb_config.api_key project = wandb_config.project config = wandb_config.config wandb.login(key=wandb_api_key) wandb.init(project=project, config=config) - # initialize the peft config if use_peft: if lora_config is None: - target_modules = ["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj",] + target_modules = [ + "q_proj", + "k_proj", + "v_proj", + "o_proj", + "gate_proj", + "up_proj", + "down_proj", + ] lora_config = LoraConfig( r=16, @@ -1277,25 +1423,23 @@ def tokenize(element): task_type="CAUSAL_LM", target_modules=target_modules, ) - + # initialize the model device, device_name = init_device() - if device_name == 'cuda': - model = AutoModelForCausalLM.from_pretrained(model_name, - token = hf_token) + if device_name == "cuda": + model = AutoModelForCausalLM.from_pretrained(model_name, token=hf_token) else: - model = AutoModelForCausalLM.from_pretrained(model_name, - token = hf_token) + model = AutoModelForCausalLM.from_pretrained(model_name, token=hf_token) model.resize_token_embeddings(len(tokenizer)) # if peft is enabled, use the peft model if use_peft: model = get_peft_model(model, peft_config=lora_config) - + _is_distritubed = False if torch.cuda.device_count() > 1: if use_ddp and use_zero: - raise ValueError('Zero optimization and DDP cannot be used together') + raise ValueError("Zero optimization and DDP cannot be used together") if use_ddp: _is_distritubed = True if not dist.is_initialized(): @@ -1304,7 +1448,7 @@ def tokenize(element): else: print("Process group already initialized") - train_args.ddp_find_unused_parameters=False + train_args.ddp_find_unused_parameters = False rank = dist.get_rank() device_id = rank % torch.cuda.device_count() model = model.to(device_id) @@ -1316,60 +1460,76 @@ def tokenize(element): dist.init_process_group("nccl", world_size=torch.cuda.device_count()) else: print("Process group already initialized") - print('using ZeRO optimization') + print("using ZeRO optimization") else: # user warning not utilizing DDP or ZeRO for multi-gpu setup - print('[WARNING]: multiple GPUs detected, but not using DDP or ZeRO') + print("[WARNING]: multiple GPUs detected, but not using DDP or ZeRO") else: model.to(device) distributed = False - if device_name == 'mps': + if device_name == "mps": train_args.fp16 = False train_args.bf16 = False - + # init training if do_split: trainer = Trainer( - model=model, - tokenizer=tokenizer, - args=train_args, - data_collator=data_collator, - train_dataset=tokenized_datasets["train"], - eval_dataset=tokenized_datasets["test"], + model=model, + tokenizer=tokenizer, + args=train_args, + data_collator=data_collator, + train_dataset=tokenized_datasets["train"], + eval_dataset=tokenized_datasets["test"], ) else: trainer = Trainer( - model=model, - tokenizer=tokenizer, - args=train_args, - data_collator=data_collator, - train_dataset=tokenized_datasets, + model=model, + tokenizer=tokenizer, + args=train_args, + data_collator=data_collator, + train_dataset=tokenized_datasets, ) # creating a directory in ouput dir for final model saving - output_dir_final = os.path.join(train_args.output_dir, 'final_model') + output_dir_final = os.path.join(train_args.output_dir, "final_model") if not os.path.exists(output_dir_final): os.makedirs(output_dir_final, exist_ok=True) - + trainer.train() trainer.save_model(output_dir_final) - - if use_ddp: - dist.destroy_process_group() - + if use_ddp: + dist.destroy_process_group() -def hf_clm_train(model_name:str='', dataset_name:str="", - context_length:int=128, data:list=[], - num_epochs:int=3, batch_size:int=8, fp16:bool=False, bf16:bool=False, - lr:float=5e-5, from_hf:bool=True, do_split:bool=True, split_ratio:float=0.2, - gradient_accumulation_steps:int=4, gradient_checkpointing:bool=False, - report_to:str='none', wandb_api_key:str='', wandb_config:wandbConfig=None, - use_peft:bool=False, peft_config=None, hf_token:str='', - hf_column:str='text', lr_scheduler_type:str='linear', eval_accumulation_steps:int=8, - output_dir:str='clm_output', ddp:bool=False, zero:bool=True): - +def hf_clm_train( + model_name: str = "", + dataset_name: str = "", + context_length: int = 128, + data: list = [], + num_epochs: int = 3, + batch_size: int = 8, + fp16: bool = False, + bf16: bool = False, + lr: float = 5e-5, + from_hf: bool = True, + do_split: bool = True, + split_ratio: float = 0.2, + gradient_accumulation_steps: int = 4, + gradient_checkpointing: bool = False, + report_to: str = "none", + wandb_api_key: str = "", + wandb_config: wandbConfig = None, + use_peft: bool = False, + peft_config=None, + hf_token: str = "", + hf_column: str = "text", + lr_scheduler_type: str = "linear", + eval_accumulation_steps: int = 8, + output_dir: str = "clm_output", + ddp: bool = False, + zero: bool = True, +): """ Train a causal language model using Hugging Face Transformers. @@ -1378,7 +1538,7 @@ def hf_clm_train(model_name:str='', dataset_name:str="", model_name : str The name or path of the pre-trained model to use. dataset_name : str, optional - The name of the dataset to use for training. + The name of the dataset to use for training. context_length : int, optional Maximum length of the input sequences. Default is 128. string_data : List[str], optional @@ -1435,20 +1595,28 @@ def hf_clm_train(model_name:str='', dataset_name:str="", tokenizer.padding_side = "right" # init wandb - if report_to == 'wandb': + if report_to == "wandb": wandb_api_key = wandb_config.api_key project = wandb_config.project config = wandb_config.config wandb.login(key=wandb_api_key) wandb.init(project=project, config=config) - # intialize the device + # intialize the device device, deivce_name = init_device() # initialize the peft config if use_peft: if not peft_config: - target_modules = ["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj",] + target_modules = [ + "q_proj", + "k_proj", + "v_proj", + "o_proj", + "gate_proj", + "up_proj", + "down_proj", + ] peft_config = LoraConfig( r=16, @@ -1460,23 +1628,22 @@ def hf_clm_train(model_name:str='', dataset_name:str="", ) # load the dataset - if not from_hf: - raw_dataset = Dataset.from_dict({'text': data}) + if not from_hf: + raw_dataset = Dataset.from_dict({"text": data}) if do_split: raw_dataset = raw_dataset.train_test_split(split_ratio) else: try: - raw_dataset = load_dataset(dataset_name, token=False, split='train') + raw_dataset = load_dataset(dataset_name, token=False, split="train") except Exception as e: - print(f'Error: {e}, this is because the split parameter is not available') + print(f"Error: {e}, this is because the split parameter is not available") raw_dataset = load_dataset(dataset_name, token=False) - raw_dataset = raw_dataset.rename_column(hf_column, 'text') + raw_dataset = raw_dataset.rename_column(hf_column, "text") if do_split: raw_dataset = raw_dataset.train_test_split(split_ratio) - # initialize the model - model = AutoModelForCausalLM.from_pretrained(model_name, token = hf_token) + model = AutoModelForCausalLM.from_pretrained(model_name, token=hf_token) model.resize_token_embeddings(len(tokenizer)) # if peft is enabled, use the peft model @@ -1486,35 +1653,35 @@ def hf_clm_train(model_name:str='', dataset_name:str="", device, device_name = init_device() if torch.cuda.device_count() > 1: if ddp and zero: - raise ValueError('Zero optimization and DDP cannot be used together') - + raise ValueError("Zero optimization and DDP cannot be used together") + if ddp: if not dist.is_initialized(): print("Initializing process group for DDP") dist.init_process_group("nccl", world_size=torch.cuda.device_count()) else: print("Process group already initialized") - + rank = dist.get_rank() device_id = rank % torch.cuda.device_count() model = model.to(device_id) model = DDP(model, device_ids=[device_id]) distributed = True TrainArgs = TrainingArguments( - output_dir=output_dir, - per_device_train_batch_size = batch_size, - per_device_eval_batch_size = batch_size, - num_train_epochs= num_epochs, - fp16=fp16, - bf16=bf16, - learning_rate=lr, - gradient_accumulation_steps=gradient_accumulation_steps, - gradient_checkpointing=gradient_checkpointing, - report_to=report_to, - remove_unused_columns=False, - lr_scheduler_type=lr_scheduler_type, - eval_accumulation_steps=eval_accumulation_steps - ) + output_dir=output_dir, + per_device_train_batch_size=batch_size, + per_device_eval_batch_size=batch_size, + num_train_epochs=num_epochs, + fp16=fp16, + bf16=bf16, + learning_rate=lr, + gradient_accumulation_steps=gradient_accumulation_steps, + gradient_checkpointing=gradient_checkpointing, + report_to=report_to, + remove_unused_columns=False, + lr_scheduler_type=lr_scheduler_type, + eval_accumulation_steps=eval_accumulation_steps, + ) elif zero: # sft_config = SFTConfig( # output_dir=output_dir, @@ -1531,65 +1698,64 @@ def hf_clm_train(model_name:str='', dataset_name:str="", # report_to=report_to # ) TrainArgs = TrainingArguments( - output_dir=output_dir, - deepspeed="/home/ubuntu/src/zero_config.json", - per_device_train_batch_size = 1, - per_device_eval_batch_size = 1, - num_train_epochs= 1, - fp16=True, - learning_rate=2e-5, - gradient_accumulation_steps=4, - report_to='none', - gradient_checkpointing=True, - remove_unused_columns=False, - logging_steps=20, - save_steps=50, - eval_steps=1, - lr_scheduler_type=lr_scheduler_type, - eval_accumulation_steps=eval_accumulation_steps - ) + output_dir=output_dir, + deepspeed="/home/ubuntu/src/zero_config.json", + per_device_train_batch_size=1, + per_device_eval_batch_size=1, + num_train_epochs=1, + fp16=True, + learning_rate=2e-5, + gradient_accumulation_steps=4, + report_to="none", + gradient_checkpointing=True, + remove_unused_columns=False, + logging_steps=20, + save_steps=50, + eval_steps=1, + lr_scheduler_type=lr_scheduler_type, + eval_accumulation_steps=eval_accumulation_steps, + ) else: TrainArgs = TrainingArguments( - output_dir=output_dir, - per_device_train_batch_size = batch_size, - per_device_eval_batch_size = batch_size, - num_train_epochs= num_epochs, - fp16=fp16, - bf16=bf16, - learning_rate=lr, - gradient_accumulation_steps=gradient_accumulation_steps, - gradient_checkpointing=gradient_checkpointing, - report_to=report_to, - remove_unused_columns=False, - lr_scheduler_type=lr_scheduler_type, - eval_accumulation_steps=eval_accumulation_steps - ) + output_dir=output_dir, + per_device_train_batch_size=batch_size, + per_device_eval_batch_size=batch_size, + num_train_epochs=num_epochs, + fp16=fp16, + bf16=bf16, + learning_rate=lr, + gradient_accumulation_steps=gradient_accumulation_steps, + gradient_checkpointing=gradient_checkpointing, + report_to=report_to, + remove_unused_columns=False, + lr_scheduler_type=lr_scheduler_type, + eval_accumulation_steps=eval_accumulation_steps, + ) else: model.to(device) distributed = False - if device_name == 'mps': + if device_name == "mps": fp16 = False bf16 = False TrainArgs = TrainingArguments( - output_dir=output_dir, - per_device_train_batch_size = batch_size, - per_device_eval_batch_size = batch_size, - num_train_epochs= num_epochs, - fp16=fp16, - bf16=bf16, - learning_rate=lr, - gradient_accumulation_steps=gradient_accumulation_steps, - gradient_checkpointing=gradient_checkpointing, - report_to=report_to, - remove_unused_columns=False, - lr_scheduler_type=lr_scheduler_type, - eval_accumulation_steps=eval_accumulation_steps - ) - + output_dir=output_dir, + per_device_train_batch_size=batch_size, + per_device_eval_batch_size=batch_size, + num_train_epochs=num_epochs, + fp16=fp16, + bf16=bf16, + learning_rate=lr, + gradient_accumulation_steps=gradient_accumulation_steps, + gradient_checkpointing=gradient_checkpointing, + report_to=report_to, + remove_unused_columns=False, + lr_scheduler_type=lr_scheduler_type, + eval_accumulation_steps=eval_accumulation_steps, + ) def tokenize(element): outputs = tokenizer( - element['text'], + element["text"], truncation=True, max_length=context_length, return_overflowing_tokens=True, @@ -1605,40 +1771,45 @@ def tokenize(element): data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False) tokenized_datasets = raw_dataset.map( - tokenize, batched=True, remove_columns=raw_dataset['train'].column_names + tokenize, batched=True, remove_columns=raw_dataset["train"].column_names ) - trainer = Trainer( - model=model, - tokenizer=tokenizer, - args=TrainArgs, - data_collator=data_collator, - train_dataset=tokenized_datasets["train"], - eval_dataset=tokenized_datasets["test"], + model=model, + tokenizer=tokenizer, + args=TrainArgs, + data_collator=data_collator, + train_dataset=tokenized_datasets["train"], + eval_dataset=tokenized_datasets["test"], ) # creating a directory in ouput dir for final model saving - output_dir_final = os.path.join(output_dir, 'final_model') + output_dir_final = os.path.join(output_dir, "final_model") if not os.path.exists(output_dir_final): os.makedirs(output_dir_final, exist_ok=True) - + trainer.train() trainer.save_model(output_dir_final) - - if ddp: - dist.destroy_process_group() - + if ddp: + dist.destroy_process_group() -def hf_clf_multi_label_train(model_name:str, dataset_name:str='', - num_epochs:int=3, batch_size:int=8, - lr:float=5e-5, from_hf:bool=True, - inputs:list=[], labels:list=[], - use_peft:bool=False, peft_config=None, - accelerator=None, apply_class_weights:bool=False, - num_labels:int=0): +def hf_clf_multi_label_train( + model_name: str, + dataset_name: str = "", + num_epochs: int = 3, + batch_size: int = 8, + lr: float = 5e-5, + from_hf: bool = True, + inputs: list = [], + labels: list = [], + use_peft: bool = False, + peft_config=None, + accelerator=None, + apply_class_weights: bool = False, + num_labels: int = 0, +): device, deivce_name = init_device() tokenizer = AutoTokenizer.from_pretrained(model_name) @@ -1646,8 +1817,10 @@ def hf_clf_multi_label_train(model_name:str, dataset_name:str='', def tokenize_function(examples): return tokenizer(examples["text"], padding="max_length", truncation=True) - - model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=num_labels) + + model = AutoModelForSequenceClassification.from_pretrained( + model_name, num_labels=num_labels + ) model.config.pad_token_id = model.config.eos_token_id model.to(device) @@ -1655,168 +1828,180 @@ def tokenize_function(examples): # TODO: this is not working atm. if apply_class_weights: label_weights = 1 - labels.sum(axis=0) / labels.sum() - - dataset = Dataset.from_dict({'text': inputs, 'labels': labels}).shuffle(seed=42) + + dataset = Dataset.from_dict({"text": inputs, "labels": labels}).shuffle(seed=42) dataset = dataset.train_test_split(0.2) tokenized_datasets = dataset.map(tokenize_function, batched=True) - - if device == 'cuda': + + if device == "cuda": if torch.cuda.device_count() > 1: model = torch.nn.DataParallel(model) distributed = True - + small_train_dataset = tokenized_datasets["train"] small_eval_dataset = tokenized_datasets["test"] - model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=num_labels) + model = AutoModelForSequenceClassification.from_pretrained( + model_name, num_labels=num_labels + ) model.config.pad_token_id = model.config.eos_token_id - class CustomTrainer(Trainer): def __init__(self, **kwargs): super().__init__(**kwargs) - + def compute_loss(self, model, inputs, return_outputs=False): labels = inputs.pop("labels") - + # forward pass outputs = model(**inputs) logits = outputs.get("logits") - + # compute custom loss loss = F.binary_cross_entropy_with_logits(logits, labels.to(torch.float32)) return (loss, outputs) if return_outputs else loss - + training_args = TrainingArguments( - output_dir = 'multilabel_classification', - learning_rate = 1e-4, - per_device_train_batch_size = 8, - per_device_eval_batch_size = 8, - num_train_epochs = 10, - weight_decay = 0.01, - evaluation_strategy = 'epoch', - save_strategy = 'epoch', - load_best_model_at_end = True, - report_to='none' + output_dir="multilabel_classification", + learning_rate=1e-4, + per_device_train_batch_size=8, + per_device_eval_batch_size=8, + num_train_epochs=10, + weight_decay=0.01, + evaluation_strategy="epoch", + save_strategy="epoch", + load_best_model_at_end=True, + report_to="none", ) # define which metrics to compute for evaluation def compute_metrics(p): """ - Compute F1 scores for evaluation. - - Parameters: - ----------- - p : tuple - Tuple containing predictions and labels. - - Returns: - -------- - dict - Dictionary containing computed F1 scores: - - 'f1_micro': Micro-average F1 score. - - 'f1_macro': Macro-average F1 score. - - 'f1_weighted': Weighted-average F1 score. - """ + Compute F1 scores for evaluation. + + Parameters: + ----------- + p : tuple + Tuple containing predictions and labels. + + Returns: + -------- + dict + Dictionary containing computed F1 scores: + - 'f1_micro': Micro-average F1 score. + - 'f1_macro': Macro-average F1 score. + - 'f1_weighted': Weighted-average F1 score. + """ predictions, labels = p - f1_micro = f1_score(labels, predictions > 0, average = 'micro') - f1_macro = f1_score(labels, predictions > 0, average = 'macro') - f1_weighted = f1_score(labels, predictions > 0, average = 'weighted') - return { - 'f1_micro': f1_micro, - 'f1_macro': f1_macro, - 'f1_weighted': f1_weighted - } + f1_micro = f1_score(labels, predictions > 0, average="micro") + f1_macro = f1_score(labels, predictions > 0, average="macro") + f1_weighted = f1_score(labels, predictions > 0, average="weighted") + return {"f1_micro": f1_micro, "f1_macro": f1_macro, "f1_weighted": f1_weighted} # train trainer = CustomTrainer( - model = model, - args = training_args, - train_dataset = small_train_dataset, - eval_dataset = small_eval_dataset, - tokenizer = tokenizer, - compute_metrics = compute_metrics, + model=model, + args=training_args, + train_dataset=small_train_dataset, + eval_dataset=small_eval_dataset, + tokenizer=tokenizer, + compute_metrics=compute_metrics, ) trainer.train() -def hf_clf_train(model_name:str, dataset_name:str='', hf_data_column:str='', hf_label_column:str='', - num_epochs:int=3, batch_size:int=8, - lr:float=5e-5, from_hf:bool=True, hf_token:str='', - inputs:list=[], labels:list=[], output_dir:str='clf_output', - use_peft:bool=False, peft_config=None, - report_to='none', wandb_api_key:str='', - ddp:bool=False, zero:bool=False, fp16:bool=False, bf16:bool=False, - gradient_accumulation_steps:int=1, gradient_checkpointing:bool=False): +def hf_clf_train( + model_name: str, + dataset_name: str = "", + hf_data_column: str = "", + hf_label_column: str = "", + num_epochs: int = 3, + batch_size: int = 8, + lr: float = 5e-5, + from_hf: bool = True, + hf_token: str = "", + inputs: list = [], + labels: list = [], + output_dir: str = "clf_output", + use_peft: bool = False, + peft_config=None, + report_to="none", + wandb_api_key: str = "", + ddp: bool = False, + zero: bool = False, + fp16: bool = False, + bf16: bool = False, + gradient_accumulation_steps: int = 1, + gradient_checkpointing: bool = False, +): + """ + Train a sequence classification model using Hugging Face transformers. + + This function trains a sequence classification model on a given dataset using + the specified Hugging Face model. It supports GPU acceleration, optional + PEFT (Performance Efficient Transfer) model integration, and optional logging + with Weights & Biases (WandB). The function handles dataset loading, tokenization, + model initialization, optimizer setup, and training loop execution. + + Parameters: + ----------- + model_name : str + The name or path of the pretrained Hugging Face model to use. + dataset_name : str, optional + Name of the dataset to load from the Hugging Face datasets library. + Default is an empty string, indicating that the dataset is provided + through inputs and labels directly. + num_epochs : int + Number of epochs to train the model. + batch_size : int + Batch size for training and evaluation. + lr : float + Learning rate for the optimizer. + from_hf : bool, optional + Whether to load the dataset from Hugging Face datasets library. + Default is True. + inputs : list, optional + List of input texts if loading dataset not from Hugging Face. + labels : list, optional + List of labels corresponding to inputs if loading dataset not from Hugging Face. + use_peft : bool, optional + Whether to apply Performance Efficient Transfer (PEFT) model. + peft_config : dict, optional + Configuration dictionary for PEFT model setup. + accelerator : object, optional + Accelerator object for distributed training. + use_wandb : bool, optional + Whether to log training progress using Weights & Biases (WandB). + Default is False. + + Returns: + -------- + None + + Raises: + ------- + None + + Examples: + --------- + # Example usage with a pretrained model from Hugging Face datasets library + hf_clf_train(model_name="bert-base-uncased", dataset_name="glue", num_epochs=3, + batch_size=8, lr=5e-5, from_hf=True) + # Example usage with custom inputs and labels + inputs = ["Sample input 1", "Sample input 2"] + labels = [0, 1] + hf_clf_train(model_name="bert-base-uncased", inputs=inputs, labels=labels, + num_epochs=3, batch_size=8, lr=5e-5, from_hf=False) + + Notes: + ------ + - Ensure the Hugging Face model specified (`model_name`) supports sequence classification. + - This function supports GPU acceleration and distributed training if multiple GPUs are available. + - PEFT (Performance Efficient Transfer) can be enabled for optimizing model performance. + - WandB integration (`use_wandb=True`) enables logging of training metrics and progress. """ -Train a sequence classification model using Hugging Face transformers. - -This function trains a sequence classification model on a given dataset using -the specified Hugging Face model. It supports GPU acceleration, optional -PEFT (Performance Efficient Transfer) model integration, and optional logging -with Weights & Biases (WandB). The function handles dataset loading, tokenization, -model initialization, optimizer setup, and training loop execution. - -Parameters: ------------ -model_name : str - The name or path of the pretrained Hugging Face model to use. -dataset_name : str, optional - Name of the dataset to load from the Hugging Face datasets library. - Default is an empty string, indicating that the dataset is provided - through inputs and labels directly. -num_epochs : int - Number of epochs to train the model. -batch_size : int - Batch size for training and evaluation. -lr : float - Learning rate for the optimizer. -from_hf : bool, optional - Whether to load the dataset from Hugging Face datasets library. - Default is True. -inputs : list, optional - List of input texts if loading dataset not from Hugging Face. -labels : list, optional - List of labels corresponding to inputs if loading dataset not from Hugging Face. -use_peft : bool, optional - Whether to apply Performance Efficient Transfer (PEFT) model. -peft_config : dict, optional - Configuration dictionary for PEFT model setup. -accelerator : object, optional - Accelerator object for distributed training. -use_wandb : bool, optional - Whether to log training progress using Weights & Biases (WandB). - Default is False. - -Returns: --------- -None - -Raises: -------- -None - -Examples: ---------- -# Example usage with a pretrained model from Hugging Face datasets library -hf_clf_train(model_name="bert-base-uncased", dataset_name="glue", num_epochs=3, - batch_size=8, lr=5e-5, from_hf=True) - -# Example usage with custom inputs and labels -inputs = ["Sample input 1", "Sample input 2"] -labels = [0, 1] -hf_clf_train(model_name="bert-base-uncased", inputs=inputs, labels=labels, - num_epochs=3, batch_size=8, lr=5e-5, from_hf=False) - -Notes: ------- -- Ensure the Hugging Face model specified (`model_name`) supports sequence classification. -- This function supports GPU acceleration and distributed training if multiple GPUs are available. -- PEFT (Performance Efficient Transfer) can be enabled for optimizing model performance. -- WandB integration (`use_wandb=True`) enables logging of training metrics and progress. -""" # Ensure no default process group exists if dist.is_initialized(): @@ -1829,13 +2014,13 @@ def hf_clf_train(model_name:str, dataset_name:str='', hf_data_column:str='', hf_ tokenizer.padding_side = "right" # init wandb - if report_to == 'wandb': + if report_to == "wandb": wandb.login(key=wandb_api_key) - wandb.init(project="clm_train", config={"model_name": model_name, - 'epochs': num_epochs}) + wandb.init( + project="clm_train", config={"model_name": model_name, "epochs": num_epochs} + ) device, deivce_name = init_device() - def tokenize_function(examples): try: @@ -1848,18 +2033,17 @@ def tokenize_function(examples): print(f"Error during tokenization: {e}") print(f"Input data: {examples['text']}") raise - - + if from_hf: dataset = load_dataset(dataset_name) # load the train split if exists - if 'train' in dataset: - dataset = dataset['train'] + if "train" in dataset: + dataset = dataset["train"] inputs = dataset[hf_data_column] labels = dataset[hf_label_column] - tokenized_datasets = Dataset.from_dict({'text':inputs, 'label':labels}) + tokenized_datasets = Dataset.from_dict({"text": inputs, "label": labels}) else: - tokenized_datasets = Dataset.from_dict({'text':inputs, 'label':labels}) + tokenized_datasets = Dataset.from_dict({"text": inputs, "label": labels}) print(tokenized_datasets) num_labels = len(set(labels)) @@ -1867,16 +2051,17 @@ def tokenize_function(examples): tokenized_datasets = tokenized_datasets.remove_columns(["text"]) tokenized_datasets.set_format("torch") tokenized_datasets = tokenized_datasets.rename_column("label", "labels") - + tokenized_datasets = tokenized_datasets.train_test_split(0.2) print(tokenized_datasets) - model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=num_labels, token = hf_token) + model = AutoModelForSequenceClassification.from_pretrained( + model_name, num_labels=num_labels, token=hf_token + ) model.config.pad_token_id = model.config.eos_token_id - - model.to(device) + model.to(device) distributed = False # if peft is enabled, use the peft model if use_peft: @@ -1889,101 +2074,100 @@ def tokenize_function(examples): task_type="CAUSAL_LM", ) model = get_peft_model(model, peft_config=peft_config) - device, device_name = init_device() if torch.cuda.device_count() > 1: if ddp and zero: - raise ValueError('Zero optimization and DDP cannot be used together') - + raise ValueError("Zero optimization and DDP cannot be used together") + if ddp: if not dist.is_initialized(): print("Initializing process group for DDP") dist.init_process_group("nccl", world_size=torch.cuda.device_count()) else: print("Process group already initialized") - + rank = dist.get_rank() device_id = rank % torch.cuda.device_count() model = model.to(device_id) model = DDP(model, device_ids=[device_id]) distributed = True TrainArgs = TrainingArguments( - output_dir=output_dir, - per_device_train_batch_size = batch_size, - per_device_eval_batch_size = batch_size, - num_train_epochs= num_epochs, - fp16=fp16, - bf16=bf16, - learning_rate=lr, - gradient_accumulation_steps=gradient_accumulation_steps, - gradient_checkpointing=gradient_checkpointing, - report_to=report_to, - remove_unused_columns=False, - ) + output_dir=output_dir, + per_device_train_batch_size=batch_size, + per_device_eval_batch_size=batch_size, + num_train_epochs=num_epochs, + fp16=fp16, + bf16=bf16, + learning_rate=lr, + gradient_accumulation_steps=gradient_accumulation_steps, + gradient_checkpointing=gradient_checkpointing, + report_to=report_to, + remove_unused_columns=False, + ) elif zero: TrainArgs = TrainingArguments( - output_dir=output_dir, - deepspeed="/home/ubuntu/src/zero_config.json", - per_device_train_batch_size = batch_size, - per_device_eval_batch_size = batch_size, - num_train_epochs= num_epochs, - fp16=fp16, - learning_rate=lr, - gradient_accumulation_steps=gradient_accumulation_steps, - report_to=report_to, - gradient_checkpointing=gradient_checkpointing, - remove_unused_columns=False, - ) + output_dir=output_dir, + deepspeed="/home/ubuntu/src/zero_config.json", + per_device_train_batch_size=batch_size, + per_device_eval_batch_size=batch_size, + num_train_epochs=num_epochs, + fp16=fp16, + learning_rate=lr, + gradient_accumulation_steps=gradient_accumulation_steps, + report_to=report_to, + gradient_checkpointing=gradient_checkpointing, + remove_unused_columns=False, + ) else: TrainArgs = TrainingArguments( - output_dir=output_dir, - per_device_train_batch_size = batch_size, - per_device_eval_batch_size = batch_size, - num_train_epochs= num_epochs, - fp16=fp16, - bf16=bf16, - learning_rate=lr, - gradient_accumulation_steps=gradient_accumulation_steps, - gradient_checkpointing=gradient_checkpointing, - report_to=report_to, - remove_unused_columns=False, - ) + output_dir=output_dir, + per_device_train_batch_size=batch_size, + per_device_eval_batch_size=batch_size, + num_train_epochs=num_epochs, + fp16=fp16, + bf16=bf16, + learning_rate=lr, + gradient_accumulation_steps=gradient_accumulation_steps, + gradient_checkpointing=gradient_checkpointing, + report_to=report_to, + remove_unused_columns=False, + ) else: model.to(device) distributed = False - if device_name == 'mps': + if device_name == "mps": fp16 = False bf16 = False TrainArgs = TrainingArguments( - output_dir=output_dir, - per_device_train_batch_size = batch_size, - per_device_eval_batch_size = batch_size, - num_train_epochs= num_epochs, - fp16=fp16, - bf16=bf16, - learning_rate=lr, - gradient_accumulation_steps=gradient_accumulation_steps, - gradient_checkpointing=gradient_checkpointing, - report_to=report_to, - remove_unused_columns=False, - ) - + output_dir=output_dir, + per_device_train_batch_size=batch_size, + per_device_eval_batch_size=batch_size, + num_train_epochs=num_epochs, + fp16=fp16, + bf16=bf16, + learning_rate=lr, + gradient_accumulation_steps=gradient_accumulation_steps, + gradient_checkpointing=gradient_checkpointing, + report_to=report_to, + remove_unused_columns=False, + ) + trainer = Trainer( - model=model, - tokenizer=tokenizer, - args=TrainArgs, - train_dataset=tokenized_datasets["train"], - eval_dataset=tokenized_datasets["test"], + model=model, + tokenizer=tokenizer, + args=TrainArgs, + train_dataset=tokenized_datasets["train"], + eval_dataset=tokenized_datasets["test"], ) # creating a directory in ouput dir for final model saving - output_dir_final = os.path.join(output_dir, 'final_model') + output_dir_final = os.path.join(output_dir, "final_model") if not os.path.exists(output_dir_final): os.makedirs(output_dir_final, exist_ok=True) - + trainer.train() trainer.save_model(output_dir_final) - + if ddp: - dist.destroy_process_group() \ No newline at end of file + dist.destroy_process_group() diff --git a/simplifine_alpha/train_engine_client.py b/simplifine_alpha/train_engine_client.py index c3b62ef..0181d83 100644 --- a/simplifine_alpha/train_engine_client.py +++ b/simplifine_alpha/train_engine_client.py @@ -1,25 +1,27 @@ -''' - Simplfine is an easy-to-use, open-source library for fine-tuning LLMs models quickly on your own hardware or cloud. - Copyright (C) 2024 Simplifine Corp. - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see . -''' +""" +Simplfine is an easy-to-use, open-source library for fine-tuning LLMs models quickly on your own hardware or cloud. +Copyright (C) 2024 Simplifine Corp. + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +""" + import requests import json from tqdm import tqdm -def send_train_query(query:dict={}, url:str=''): + +def send_train_query(query: dict = {}, url: str = ""): """ Send a training query to the server endpoint. @@ -58,21 +60,24 @@ def send_train_query(query:dict={}, url:str=''): - Ensure the server is running and accessible at the specified URL. - The headers are set to indicate JSON data. """ - _url = url + '/query' - headers = {'Content-Type': 'application/json'} # Set the headers to indicate JSON data - payload = query # Prepare the payload with the data list + _url = url + "/query" + headers = { + "Content-Type": "application/json" + } # Set the headers to indicate JSON data + payload = query # Prepare the payload with the data list try: response = requests.post(_url, headers=headers, data=json.dumps(payload)) response.raise_for_status() # Raise an HTTPError for bad responses (4xx and 5xx) - + # Return the JSON response from the server return response.json() - + except requests.exceptions.RequestException as e: # Handle any exceptions that occur during the request return {"error": str(e)} - -def get_company_status(api_key:str='', url:str=''): + + +def get_company_status(api_key: str = "", url: str = ""): """ Retrieve the company status from the server endpoint. @@ -112,57 +117,65 @@ def get_company_status(api_key:str='', url:str=''): - The headers are set to indicate JSON data. - The 'response' field is extracted from the server's JSON response. """ - _url = url + '/status' - headers = {'Content-Type': 'application/json'} # Set the headers to indicate JSON data - payload = {'api_key':api_key} # Prepare the payload with the data list + _url = url + "/status" + headers = { + "Content-Type": "application/json" + } # Set the headers to indicate JSON data + payload = {"api_key": api_key} # Prepare the payload with the data list try: response = requests.post(_url, headers=headers, data=json.dumps(payload)) response.raise_for_status() # Raise an HTTPError for bad responses (4xx and 5xx) - + # Return the JSON response from the server - return response.json()['response'] - + return response.json()["response"] + except requests.exceptions.RequestException as e: # Handle any exceptions that occur during the request return {"error": str(e)} -def get_job_log(api_key:str='', job_id:str='', url:str=''): - _url = url + '/job_output_log' - headers = {'Content-Type': 'application/json'} # Set the headers to indicate JSON data - payload = {'api_key':api_key, 'job_id':job_id} # Prepare the payload with the data list + +def get_job_log(api_key: str = "", job_id: str = "", url: str = ""): + _url = url + "/job_output_log" + headers = { + "Content-Type": "application/json" + } # Set the headers to indicate JSON data + payload = { + "api_key": api_key, + "job_id": job_id, + } # Prepare the payload with the data list try: response = requests.post(_url, headers=headers, data=json.dumps(payload)) response.raise_for_status() # Raise an HTTPError for bad responses (4xx and 5xx) - + # Return the JSON response from the server return response.json() - + except requests.exceptions.RequestException as e: # Handle any exceptions that occur during the request return {"error": str(e)} + def download_directory(api_key, job_id, save_path, url): - _url = url + '/download_model' - payload = { - "api_key": api_key, - "job_id": job_id - } - headers = { - 'Content-Type': 'application/json' - } + _url = url + "/download_model" + payload = {"api_key": api_key, "job_id": job_id} + headers = {"Content-Type": "application/json"} try: response = requests.post(_url, json=payload, headers=headers, stream=True) response.raise_for_status() # Get total file size from headers - total_size = int(response.headers.get('content-length', 0)) + total_size = int(response.headers.get("content-length", 0)) block_size = 8192 # Size of each block to be read - with open(save_path, 'wb') as file, tqdm( - total=total_size, unit='iB', unit_scale=True, desc="Downloading") as bar: + with ( + open(save_path, "wb") as file, + tqdm( + total=total_size, unit="iB", unit_scale=True, desc="Downloading" + ) as bar, + ): for chunk in response.iter_content(chunk_size=block_size): if chunk: file.write(chunk) @@ -172,23 +185,27 @@ def download_directory(api_key, job_id, save_path, url): except requests.exceptions.RequestException as e: print(f"Error occurred: {e}") -def stop_job(api_key:str='', job_id:str='', url:str=''): - _url = url + '/stop_job' - headers = {'Content-Type': 'application/json'} # Set the headers to indicate JSON data - payload = {'api_key':api_key, 'job_id':job_id} + +def stop_job(api_key: str = "", job_id: str = "", url: str = ""): + _url = url + "/stop_job" + headers = { + "Content-Type": "application/json" + } # Set the headers to indicate JSON data + payload = {"api_key": api_key, "job_id": job_id} try: response = requests.post(_url, headers=headers, data=json.dumps(payload)) response.raise_for_status() # Raise an HTTPError for bad responses (4xx and 5xx) - + # Return the JSON response from the server return response.json() except requests.exceptions.RequestException as e: return {"error": str(e)} -if __name__ == '__main__': + +if __name__ == "__main__": # Example usage of the functions # Set the URL of the server url = "http://18.189.220.250:5000" - api_key = 'SimpTest' + api_key = "SimpTest" status = get_company_status(api_key, url) - print(status) \ No newline at end of file + print(status)