diff --git a/coolprompt/assistant.py b/coolprompt/assistant.py index 3a70501..93e12af 100644 --- a/coolprompt/assistant.py +++ b/coolprompt/assistant.py @@ -7,7 +7,7 @@ from coolprompt.task_detector.detector import TaskDetector from coolprompt.data_generator.generator import SyntheticDataGenerator from coolprompt.language_model.llm import DefaultLLM -from coolprompt.optimizer.hype import hype_optimizer +from coolprompt.optimizer.hype import HyPEOptimizer, HyPEROptimizer from coolprompt.optimizer.reflective_prompt import reflectiveprompt from coolprompt.optimizer.distill_prompt.run import distillprompt from coolprompt.utils.logging_config import logger, set_verbose, setup_logging @@ -23,10 +23,6 @@ CLASSIFICATION_TASK_TEMPLATE, GENERATION_TASK_TEMPLATE, ) -from coolprompt.utils.prompt_templates.hype_templates import ( - CLASSIFICATION_TASK_TEMPLATE_HYPE, - GENERATION_TASK_TEMPLATE_HYPE, -) from coolprompt.utils.correction.corrector import correct from coolprompt.utils.correction.rule import LanguageRule from coolprompt.prompt_assistant.prompt_assistant import PromptAssistant @@ -36,12 +32,8 @@ class PromptTuner: """Prompt optimization tool supporting multiple methods.""" TEMPLATE_MAP = { - (Task.CLASSIFICATION, Method.HYPE): CLASSIFICATION_TASK_TEMPLATE_HYPE, - (Task.CLASSIFICATION, Method.REFLECTIVE): CLASSIFICATION_TASK_TEMPLATE, - (Task.CLASSIFICATION, Method.DISTILL): CLASSIFICATION_TASK_TEMPLATE, - (Task.GENERATION, Method.HYPE): GENERATION_TASK_TEMPLATE_HYPE, - (Task.GENERATION, Method.REFLECTIVE): GENERATION_TASK_TEMPLATE, - (Task.GENERATION, Method.DISTILL): GENERATION_TASK_TEMPLATE, + Task.CLASSIFICATION: CLASSIFICATION_TASK_TEMPLATE, + Task.GENERATION: GENERATION_TASK_TEMPLATE, } def __init__( @@ -102,7 +94,7 @@ def get_task_prompt_template(self, task: str, method: str) -> str: The type of task, either "classification" or "generation". method (str): Optimization method to use. - Available methods are: ['hype', 'reflective', 'distill'] + Available methods are: ['hype', 'reflective', 'distill', 'hyper'] Returns: str: The prompt template for the given task. @@ -113,7 +105,7 @@ def get_task_prompt_template(self, task: str, method: str) -> str: ) task = validate_task(task) method = validate_method(method) - return self.TEMPLATE_MAP[(task, method)] + return self.TEMPLATE_MAP[task] def _get_dataset_split( self, @@ -182,7 +174,7 @@ def run( target (Iterable): Target iterable object for autoprompting optimization. method (str): Optimization method to use. - Available methods are: ['hype', 'reflective', 'distill'] + Available methods are: ['hype', 'reflective', 'distill', 'hyper'] Defaults to hype. metric (str): Metric to use for optimization. problem_description (str): a string that contains @@ -297,7 +289,7 @@ def run( prompt=start_prompt, task=task, problem_description=problem_description, - num_samples=generate_num_samples + num_samples=generate_num_samples, ) self.synthetic_dataset = dataset self.synthetic_target = target @@ -329,10 +321,21 @@ def run( logger.debug(f"Additional kwargs: {kwargs}") if method is Method.HYPE: - final_prompt = hype_optimizer( + hype_opt = HyPEOptimizer(model=self._target_model) + final_prompt = hype_opt.optimize( + prompt=start_prompt, + meta_info={"task_description": problem_description}, + ) + elif method is Method.HYPER: + hyper_opt = HyPEROptimizer( model=self._target_model, + evaluator=evaluator, + **kwargs, + ) + final_prompt = hyper_opt.optimize( prompt=start_prompt, - problem_description=problem_description, + dataset_split=dataset_split, + meta_info={"task_description": problem_description}, ) elif method is Method.REFLECTIVE: final_prompt = reflectiveprompt( @@ -360,7 +363,7 @@ def run( ) logger.debug(f"Final prompt:\n{final_prompt}") - template = self.TEMPLATE_MAP[(task, method)] + template = self.TEMPLATE_MAP[task] logger.info(f"Evaluating on given dataset for {task} task...") self.init_metric = evaluator.evaluate( prompt=start_prompt, diff --git a/coolprompt/data_generator/generator.py b/coolprompt/data_generator/generator.py index c0f566d..ddaf673 100644 --- a/coolprompt/data_generator/generator.py +++ b/coolprompt/data_generator/generator.py @@ -1,7 +1,5 @@ -import json from typing import Optional, List, Tuple, Any -import dirtyjson from langchain_core.language_models.base import BaseLanguageModel from langchain_core.language_models.chat_models import BaseChatModel from langchain_core.messages.ai import AIMessage @@ -52,11 +50,11 @@ def _generate( Returns: Any: generated data """ - if hasattr(self.model, 'model'): + if hasattr(self.model, "model"): wrapped_model = self.model.model else: wrapped_model = self.model - + if not isinstance(wrapped_model, BaseChatModel): output = self.model.invoke(request) if isinstance(output, AIMessage): diff --git a/coolprompt/evaluator/evaluator.py b/coolprompt/evaluator/evaluator.py index c6dfc5a..0888146 100644 --- a/coolprompt/evaluator/evaluator.py +++ b/coolprompt/evaluator/evaluator.py @@ -1,7 +1,7 @@ -import random -from langchain_core.language_models.base import BaseLanguageModel -from typing import Optional +from dataclasses import dataclass +from typing import List, Optional +from langchain_core.language_models.base import BaseLanguageModel from langchain_core.messages.ai import AIMessage from coolprompt.evaluator.metrics import BaseMetric from coolprompt.utils.logging_config import logger @@ -12,6 +12,22 @@ ) +@dataclass +class FailedExampleDetailed: + instance: str + assistant_answer: str + model_answer_parsed: Optional[str] = None + metric_value: float | int = 0.0 + ground_truth: str | int = "" + + +@dataclass +class EvalResultDetailed: + aggregate_score: float + score_per_task: List[float | int] = None + failed_examples: List[FailedExampleDetailed] = None + + class Evaluator: """Evaluator class to perform model evaluation using a specified metric. @@ -35,29 +51,17 @@ def evaluate( targets: list[str | int], template: Optional[str] = None, ) -> float: - """ - Evaluate the model on a dataset - by generating answers and computing the metric. - - For each sample in the dataset, - the prompt is concatenated with the sample, - passed to the model to generate an output, - and then all outputs are evaluated - against the targets using the metric. + """Evaluate the model on a dataset. Args: prompt (str): The prompt string to prepend to each dataset sample. dataset (list[str]): List of input samples to evaluate. - targets (list[str|int]): - Corresponding ground truth labels or references. - template (Optional[str]): - Prompt template for defined task type. - If None, uses default template. + targets (list[str|int]): Corresponding ground truth labels. + template (Optional[str]): Prompt template for defined task type. Returns: float: The computed evaluation metric score. """ - if template is None: template = self._get_default_template() @@ -80,28 +84,64 @@ def evaluate( return self.metric.compute(answers, targets, dataset) - def _get_full_prompt( + def evaluate_detailed( self, prompt: str, - sample: str, + dataset: list[str], + targets: list[str | int], template: Optional[str] = None, - ) -> str: - """Inserts parts of the prompt into the task template. + ) -> EvalResultDetailed: + """Evaluate the model and return detailed results per sample.""" + if template is None: + template = self._get_default_template() - Args: - prompt (str): the main instruction for the task - sample (str): the input sample - template (Optional[str]): - Prompt template for defined task type. - If None, uses default template. + logger.info( + f"Evaluating (detailed) prompt for {self.task} task on {len(dataset)} samples" + ) + if self.task == Task.CLASSIFICATION: + self.metric.extract_labels(targets) + + answers = self.model.batch( + [ + self._get_full_prompt(prompt, sample, template) + for sample in dataset + ] + ) + answers = [ + a.content if isinstance(a, AIMessage) else a for a in answers + ] - Raises: - ValueError: if type of task is not supported + parsed_answers = [self.metric.parse_output(a) for a in answers] + aggregate_score, score_per_task = self.metric.compute_detailed( + answers, targets + ) - Returns: - str: the full prompt to be passed to the model - """ + failed_examples = [] + for i, score in enumerate(score_per_task): + if score == 0: + failed_examples.append( + FailedExampleDetailed( + instance=dataset[i], + assistant_answer=answers[i], + model_answer_parsed=parsed_answers[i], + metric_value=score, + ground_truth=targets[i], + ) + ) + + return EvalResultDetailed( + aggregate_score=aggregate_score, + score_per_task=score_per_task, + failed_examples=failed_examples, + ) + def _get_full_prompt( + self, + prompt: str, + sample: str, + template: Optional[str] = None, + ) -> str: + """Inserts parts of the prompt into the task template.""" if template is None: template = self._get_default_template() @@ -116,7 +156,6 @@ def _get_full_prompt( def _get_default_template(self) -> str: """Returns the default template for the task type.""" - match self.task: case Task.CLASSIFICATION: return CLASSIFICATION_TASK_TEMPLATE diff --git a/coolprompt/evaluator/metrics.py b/coolprompt/evaluator/metrics.py index 9c9b89f..bb815f7 100644 --- a/coolprompt/evaluator/metrics.py +++ b/coolprompt/evaluator/metrics.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Optional +from typing import List, Optional, Tuple from deepeval.metrics import GEval from deepeval.test_case import LLMTestCase, LLMTestCaseParams @@ -86,7 +86,7 @@ def _compute_raw( self, outputs: list[str | int], targets: list[str | int], - dataset: Optional[list[str]] = None + dataset: Optional[list[str]] = None, ) -> float: """Compute metric value from preprocessed model answers. @@ -120,7 +120,7 @@ def compute( self, outputs: list[str | int], targets: list[str | int], - dataset: Optional[list[str]] = None + dataset: Optional[list[str]] = None, ) -> float: """Compute metric value from text model outputs @@ -134,9 +134,7 @@ def compute( """ output_labels = list( map( - lambda x: extract_answer( - x, self.ANS_TAGS, self.FORMAT_MISMATCH_LABEL - ), + lambda x: extract_answer(x, self.ANS_TAGS, self.FORMAT_MISMATCH_LABEL), outputs, ) ) @@ -144,9 +142,38 @@ def compute( encoded_output_labels, encoded_targets = self._encode_labels( output_labels, targets ) - return self._compute_raw( - encoded_output_labels, encoded_targets, dataset - ) + return self._compute_raw(encoded_output_labels, encoded_targets, dataset) + + def parse_output(self, output: str) -> str: + """Extract parsed answer from model output. + + Args: + output: Raw model output string. + + Returns: + Extracted answer from tags, or original output if not found. + """ + return extract_answer(output, self.ANS_TAGS, format_mismatch_label=output) + + def compute_detailed( + self, + outputs: list[str | int], + targets: list[str | int], + dataset: Optional[list[str]] = None, + ) -> Tuple[float, List[float | int]]: + """Compute metric value per sample and aggregate. + + Returns: + Tuple of (aggregate_score, score_per_task). + score_per_task[i] - score for i-th sample. + aggregate_score - same as compute(). + """ + score_per_task = [] + for o, t in zip(outputs, targets): + s = self._compute_raw([o], [t], dataset) + score_per_task.append(s) + aggregate = self.compute(outputs, targets, dataset) + return aggregate, score_per_task def __str__(self) -> str: return self._get_name() @@ -219,7 +246,7 @@ class GenerationMetric(BaseMetric): FORMAT_MISMATCH_LABEL = "" - def __init__(self): + def __init__(self, name=None): """Initialize metric""" super().__init__() @@ -316,6 +343,15 @@ def _compute_raw(self, outputs, targets, dataset): f1_list = super()._compute_raw(outputs, targets) return sum(f1_list) / len(f1_list) + def compute_detailed( + self, + outputs: list[str | int], + targets: list[str | int], + dataset: Optional[list[str]] = None, + ) -> Tuple[float, List[float]]: + f1_list = super()._compute_raw(outputs, targets, dataset) + return sum(f1_list) / len(f1_list), f1_list + class LLMAsJudge(GenerationMetric): """LLM-as-a-judge metric for generation tasks.""" @@ -462,6 +498,21 @@ def _compute_raw(self, outputs, targets, dataset): outputs = [extract_number_from_text(item) for item in outputs] return float(mean([o == t for o, t in zip(outputs, targets)])) + def compute_detailed( + self, + outputs: list[str | int], + targets: list[str | int], + dataset: Optional[list[str]] = None, + ) -> Tuple[float, List[int]]: + targets = [extract_number_from_text(item) for item in targets] + outputs = [extract_number_from_text(item) for item in outputs] + score_per_task = [1 if o == t else 0 for o, t in zip(outputs, targets)] + return mean(score_per_task), score_per_task + + def parse_output(self, output: str) -> str: + extracted = extract_answer(output, self.ANS_TAGS, format_mismatch_label=output) + return extract_number_from_text(extracted) + def define_lang(outputs, targets): langs = [detect_language(target) for target in targets] @@ -469,8 +520,7 @@ def define_lang(outputs, targets): CLASSIFICATION_METRIC_NAME_MAPPING = { - metric._get_name(): metric - for metric in ClassificationMetric.__subclasses__() + metric._get_name(): metric for metric in ClassificationMetric.__subclasses__() } GENERATION_METRIC_NAME_MAPPING = { @@ -509,8 +559,9 @@ def validate_and_create_metric( return CLASSIFICATION_METRIC_NAME_MAPPING[metric]() error_msg = ( f"Invalid metric for {task} task: {metric}. " - f"Available metrics: {', '.join( - CLASSIFICATION_METRIC_NAME_MAPPING.keys())}." + f"Available metrics: { + ', '.join(CLASSIFICATION_METRIC_NAME_MAPPING.keys()) + }." ) logger.error(error_msg) raise ValueError(error_msg) @@ -544,8 +595,9 @@ def validate_and_create_metric( return GENERATION_METRIC_NAME_MAPPING[metric]() error_msg = ( f"Invalid metric for {task} task: {metric}. " - f"Available metrics: {', '.join( - GENERATION_METRIC_NAME_MAPPING.keys())}." + f"Available metrics: { + ', '.join(GENERATION_METRIC_NAME_MAPPING.keys()) + }." ) logger.error(error_msg) raise ValueError(error_msg) diff --git a/coolprompt/language_model/llm.py b/coolprompt/language_model/llm.py index 0b1e234..7b9feed 100644 --- a/coolprompt/language_model/llm.py +++ b/coolprompt/language_model/llm.py @@ -2,6 +2,7 @@ from langchain_huggingface import ChatHuggingFace, HuggingFacePipeline from langchain_core.language_models.base import BaseLanguageModel +from langchain_community.callbacks.manager import get_openai_callback from coolprompt.utils.logging_config import logger from coolprompt.utils.default import ( DEFAULT_MODEL_NAME, @@ -39,3 +40,77 @@ def init( model_kwargs={'dtype': 'float16'} ) return ChatHuggingFace(llm=llm) + + +class TrackedLLMWrapper: + """Простая обертка вокруг ChatOpenAI с трекингом""" + + def __init__(self, model, tracker): + self.model = model + self.tracker = tracker + + def invoke(self, input, **kwargs): + with get_openai_callback() as cb: + result = self.model.invoke(input, **kwargs) + self.tracker._update_stats(cb, True) + return result + + def batch(self, inputs, **kwargs): + with get_openai_callback() as cb: + results = self.model.batch(inputs, **kwargs) + self.tracker._update_stats(cb, False, batch_size=len(inputs)) + return results + + def reset_stats(self): + self.tracker.reset_stats() + + def get_stats(self): + return self.tracker.get_stats() + + # Проксируем остальные методы + def __getattr__(self, name): + return getattr(self.model, name) + + +class OpenAITracker: + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._reset_stats() + return cls._instance + + def _reset_stats(self): + self.stats = { + "total_calls": 0, + "total_tokens": 0, + "prompt_tokens": 0, + "completion_tokens": 0, + "total_cost": 0.0, + "invoke_calls": 0, + "batch_calls": 0, + "batch_items": 0, + } + + def _update_stats(self, callback, invoke_flag, **kwargs): + self.stats["total_calls"] += 1 + self.stats["total_tokens"] += callback.total_tokens + self.stats["prompt_tokens"] += callback.prompt_tokens + self.stats["completion_tokens"] += callback.completion_tokens + self.stats["total_cost"] += callback.total_cost + if invoke_flag: + self.stats["invoke_calls"] += 1 + else: + self.stats["batch_calls"] += 1 + self.stats["batch_items"] += kwargs.get("batch_size", 0) + + def wrap_model(self, model): + """Обертывает модель для трекинга""" + return TrackedLLMWrapper(model, self) + + def get_stats(self): + return self.stats.copy() + + def reset_stats(self): + self._reset_stats() diff --git a/coolprompt/optimizer/hype/__init__.py b/coolprompt/optimizer/hype/__init__.py index c0ebaa4..f2aa268 100644 --- a/coolprompt/optimizer/hype/__init__.py +++ b/coolprompt/optimizer/hype/__init__.py @@ -1,5 +1,8 @@ -from coolprompt.optimizer.hype.hype import hype_optimizer +from coolprompt.optimizer.hype.hype import HyPEOptimizer, Optimizer +from coolprompt.optimizer.hype.hyper import HyPEROptimizer __all__ = [ - 'hype_optimizer' + "Optimizer", + "HyPEOptimizer", + "HyPEROptimizer", ] diff --git a/coolprompt/optimizer/hype/feedback_module.py b/coolprompt/optimizer/hype/feedback_module.py new file mode 100644 index 0000000..1b97bc0 --- /dev/null +++ b/coolprompt/optimizer/hype/feedback_module.py @@ -0,0 +1,182 @@ +"""FeedbackModule for generating prompt improvement recommendations.""" + +import random +from typing import Any, List, Optional + +from coolprompt.evaluator.evaluator import FailedExampleDetailed +from coolprompt.utils.parsing import extract_json, get_model_answer_extracted + + +FEEDBACK_PROMPT_TEMPLATE = """You are an expert prompt engineer. + +The prompt was evaluated on benchmark task and failed on some examples. You will be given with a prompt and an example. + +Prompt: + +{prompt} + + +Failed task: + +{instance} + + +Model answer (raw): + +{model_answer} + + +Model answer (parsed): + +{model_answer_parsed} + + +Metric value: {metric_value} + +Сorrect answer: + +{ground_truth} + + +Identify the core reasoning error pattern. + +Give ONE general, universal recommendation to improve the prompt (no task-special details). + +Format: Consice, max 20-25 words, starts with action verb. Output nothing but the actual recommendation. Avoid meta‑comments (e.g., "similar to…", "as before…") – the recommendation must stand alone. + +Example: "Require step-by-step reasoning before classifying." + +Recommendation: +""" + +FILTER_RECOMMENDATIONS_PROMPT = """You have a list of recommendations for prompt improvement: + +{recommendations} + +TASK: +1. Group them into conceptual clusters (similar ideas). +2. For each cluster, **synthesize a single, new recommendation** that captures the essence of all items in that cluster. Do not just copy an existing one. +3. Rank clusters by size (largest first). If some clusters conflict - drop the less ones. +4. Output ONLY a JSON array of the synthesized recommendations, in rank order. + +GOOD EXAMPLES: +Input: ["step-by-step", "break down calc", "don't show work", "format clearly"] +Correct output: ["Require detailed step-by-step reasoning with calculations", "Specify the desired output format explicitly"] +Why good: +- Captured main ideas of reasoning cluster into 1 strong rec +- Didn't loose cluster from "format clearly" +- Resolved conflict: "don't show work" is less frequent recommendation, so its cluster was dropped + +BAD EXAMPLES: +Input: ["Focus on clarifying the output format requirements", + "Add examples of expected responses to the prompt", + "Make sure to specify exact sentiment labels", + "Include examples to avoid confusion with similar labels", + "Focus on tone analysis in the text", + "Clarify what constitutes positive vs negative", + "Add examples of positive responses", + "Similar to previous - add more examples"] +Wrong output: ["Similar to previous - add more examples", "Add examples of positive responses", "Make sure to specify exact sentiment labels", "Focus on tone analysis in the text"] +Why bad: +- "Similar to previous" = meta-trash +- No synthesis of 6+ example recs into 1 strong rec, uses only existing recommendations +- Two different recommendations with a similiar intent: adding examples (duplicates) +""" + + +class FeedbackModule: + """Generates recommendations for improving prompts based on failed examples.""" + + def __init__(self, model: Any) -> None: + self.model = model + + def generate_recommendation( + self, + prompt: str, + instance: str, + model_answer: str, + model_answer_parsed: Optional[str] = None, + metric_value: float | int = 0.0, + ground_truth: str | int = "", + ) -> str: + """Generate a single recommendation for a failed example. + + Args: + prompt: The original prompt that was used. + instance: The task instance (input/question). + model_answer: The model's answer (incorrect, raw). + model_answer_parsed: The model's parsed answer (for metric calculation). + metric_value: The metric value for this answer. + ground_truth: The correct answer. + + Returns: + A recommendation string for improving the prompt. + """ + formatted_prompt = FEEDBACK_PROMPT_TEMPLATE.format( + prompt=prompt, + instance=instance, + model_answer=model_answer, + model_answer_parsed=model_answer_parsed or "", + metric_value=metric_value, + ground_truth=ground_truth, + ) + result = get_model_answer_extracted(self.model, formatted_prompt) + return self._process_output(result) + + def generate_recommendations( + self, + prompt: str, + failed_examples: List[FailedExampleDetailed], + ) -> List[str]: + """Generate recommendations for all failed examples. + + Args: + prompt: The original prompt that was used. + failed_examples: List of failed examples. + + Returns: + List of recommendation strings. + """ + return [ + self.generate_recommendation( + prompt=prompt, + instance=fe.instance, + model_answer=fe.assistant_answer, + model_answer_parsed=fe.model_answer_parsed, + metric_value=fe.metric_value, + ground_truth=fe.ground_truth, + ) + for fe in failed_examples + ] + + def filter_recommendations(self, recommendations: List[str]) -> List[str]: + """Filter and deduplicate recommendations using LLM. + + Args: + recommendations: List of recommendation strings. + + Returns: + Deduplicated and filtered list of recommendations. + """ + if not recommendations: + return [] + + formatted_recs = "\n".join( + f"{i + 1}. {rec}" for i, rec in enumerate(recommendations) + ) + prompt = FILTER_RECOMMENDATIONS_PROMPT.format( + recommendations=formatted_recs + ) + result = get_model_answer_extracted(self.model, prompt) + try: + data = extract_json(result) + if data and isinstance(data, list): + return [str(x) for x in data] + except Exception: + pass + + return random.sample(recommendations, min(3, len(recommendations))) + + def _process_output(self, output: Any) -> str: + """Process model output to extract recommendation.""" + return output if isinstance(output, str) else str(output) diff --git a/coolprompt/optimizer/hype/hype.py b/coolprompt/optimizer/hype/hype.py index b96f2d5..c71a0bc 100644 --- a/coolprompt/optimizer/hype/hype.py +++ b/coolprompt/optimizer/hype/hype.py @@ -1,47 +1,115 @@ -from langchain_core.language_models.base import BaseLanguageModel +from abc import ABC, abstractmethod +from typing import Any, List, Optional, Union -from coolprompt.utils.logging_config import logger -from coolprompt.utils.prompt_templates.hype_templates import ( - HYPE_PROMPT_TEMPLATE, -) -from coolprompt.utils.parsing import ( - extract_answer, - get_model_answer_extracted, - safe_template, +from coolprompt.utils.parsing import extract_answer, get_model_answer_extracted +from coolprompt.utils.prompt_templates.hyper_templates import ( + HypeMetaPromptBuilder, + HypeMetaPromptConfig, + META_INFO_SECTION, + META_PROMPT_SECTIONS, ) -INSTRUCTIVE_PROMPT_TAGS = ("[PROMPT_START]", "[PROMPT_END]") +def _build_full_meta_prompt_template(builder: HypeMetaPromptBuilder) -> str: + body = builder.build_meta_prompt() + return ( + body + + "\n\nUser query:\n\n{QUERY}\n\n" + + "{META_INFO_BLOCK}" + ) -def hype_optimizer( - model: BaseLanguageModel, prompt: str, problem_description: str -) -> str: - """Rewrites prompt by injecting it - into predefined template and querying LLM. - Args: - model (BaseLanguageModel): Any LangChain BaseLanguageModel instance. - prompt (str): Input prompt to optimize. - problem_description (str): Brief description of the task, explaining - its domain. - Returns: - str: LLM-generated rewritten prompt. - """ +class Optimizer(ABC): + def __init__(self, model): + self.model = model - logger.info("Running HyPE optimization...") - logger.debug(f"Start prompt:\n{prompt}") + @abstractmethod + def optimize(self): + pass - query = safe_template( - HYPE_PROMPT_TEMPLATE, - PROBLEM_DESCRIPTION=problem_description, - QUERY=prompt, - ) - answer = get_model_answer_extracted(model, query) +class HyPEOptimizer(Optimizer): + def __init__( + self, model, config: Optional[HypeMetaPromptConfig] = None + ) -> None: + super().__init__(model) + self.builder = HypeMetaPromptBuilder(config) + self.meta_prompt = _build_full_meta_prompt_template(self.builder) - logger.info("HyPE optimization completed") - logger.debug(f"Raw HyPE output:\n{answer}") + def get_section(self, name: str) -> Any: + """Returns the current value of the section (for recommendations — List[str]).""" + if name not in META_PROMPT_SECTIONS: + raise ValueError( + f"Unknown section: {name}. Expected: {META_PROMPT_SECTIONS}" + ) + if name == "recommendations": + return list(self.builder.config.recommendations) + if name == "constraints": + return list(self.builder.config.constraints) + return self.builder.get_cached_section(name) - return extract_answer( - answer, INSTRUCTIVE_PROMPT_TAGS, format_mismatch_label=answer - ) + def update_section( + self, + name: str, + value: Union[str, List[str]], + ) -> None: + """Updates the section and rebuilds the meta-prompt.""" + if name not in META_PROMPT_SECTIONS: + raise ValueError( + f"Unknown section: {name}. Expected: {META_PROMPT_SECTIONS}" + ) + if name == "recommendations": + self.builder.config.recommendations = list(value) + elif name == "constraints": + self.builder.config.constraints = list(value) + elif name == "output_format" and isinstance(value, str): + self.builder.config.output_format_section = value + else: + raise ValueError( + f"update_section for {name}: unsupported value type" + ) + self.builder.rebuild_all_sections() + self._rebuild_meta_prompt() + + def _rebuild_meta_prompt(self) -> None: + self.meta_prompt = _build_full_meta_prompt_template(self.builder) + + def set_meta_prompt(self, meta_prompt: str) -> None: + self.meta_prompt = meta_prompt + + def optimize( + self, + prompt: str, + meta_info: Optional[dict[str, Any]] = None, + n_prompts: int = 1, + ) -> Union[str, List[str]]: + query = self._format_meta_prompt(prompt, **(meta_info or {})) + raw_result = get_model_answer_extracted(self.model, query, n=n_prompts) + if n_prompts == 1: + return self._process_model_output(raw_result) + return [self._process_model_output(r) for r in raw_result] + + def _format_meta_prompt(self, prompt: str, **kwargs) -> str: + if kwargs: + meta_info_content = "\n".join( + [f"{k}: {v}" for k, v in kwargs.items()] + ) + meta_info_block = META_INFO_SECTION.format( + meta_info_content=meta_info_content + ) + else: + meta_info_block = "" + + return self.meta_prompt.format( + QUERY=prompt, META_INFO_BLOCK=meta_info_block + ) + + RESULT_PROMPT_TAGS = ("", "") + + def _process_model_output(self, output: Any) -> str: + result = extract_answer( + output, + self.RESULT_PROMPT_TAGS, + format_mismatch_label=output, + ) + return result if isinstance(result, str) else str(result) diff --git a/coolprompt/optimizer/hype/hyper.py b/coolprompt/optimizer/hype/hyper.py new file mode 100644 index 0000000..dae910f --- /dev/null +++ b/coolprompt/optimizer/hype/hyper.py @@ -0,0 +1,220 @@ +"""HyPEROptimizer: HyPE with iterative refinement via recommendations.""" + +import random +from typing import Any, List, Optional, Sequence, Tuple + +from tqdm import tqdm + +from coolprompt.optimizer.hype.hype import HyPEOptimizer, Optimizer +from coolprompt.optimizer.hype.feedback_module import FeedbackModule +from coolprompt.utils.parsing import get_model_answer_extracted +from coolprompt.evaluator.evaluator import ( + Evaluator, + EvalResultDetailed, +) + + +def sample_mini_batch( + dataset: Sequence[str], + targets: Sequence[str | int], + size: int, + seed: Optional[int] = None, +) -> Tuple[List[str], List[str | int]]: + """Sample a mini-batch from the dataset. + + Returns: + (samples, targets) - lists of length size (or less if dataset is smaller). + """ + import random + + rng = random.Random(seed) + n = min(size, len(dataset)) + indices = rng.sample(range(len(dataset)), n) + return ( + [dataset[i] for i in indices], + [targets[i] for i in indices], + ) + + +def compute_pareto_front( + candidates: List[str], + results: List[EvalResultDetailed], +) -> List[Tuple[str, EvalResultDetailed]]: + """Compute Pareto front from candidates based on score_per_task. + + A candidate dominates another if its score_per_task >= other.score_per_task + for all tasks and > for at least one. + + Returns: + List of (candidate, result) that belong to the Pareto front. + """ + n = len(candidates) + is_pareto = [True] * n + + for i in range(n): + if not is_pareto[i]: + continue + for j in range(n): + if i == j or not is_pareto[j]: + continue + # Check if i dominates j + i_scores = results[i].score_per_task + j_scores = results[j].score_per_task + if not i_scores or not j_scores: + continue + if len(i_scores) != len(j_scores): + continue + i_dominates_j = all( + i_s >= j_s for i_s, j_s in zip(i_scores, j_scores) + ) and any(i_s > j_s for i_s, j_s in zip(i_scores, j_scores)) + if i_dominates_j: + is_pareto[j] = False + + return [(candidates[i], results[i]) for i in range(n) if is_pareto[i]] + + +class HyPEROptimizer(Optimizer): + """HyPE with iterative refinement via evaluation-based recommendations.""" + + def __init__( + self, + model: Any, + evaluator: Evaluator, + *, + n_iterations: int = 5, + patience: int = None, + n_candidates: int = 3, + top_n_candidates: int = 3, + k_samples: int = 3, + mini_batch_size: int = 16, + ) -> None: + super().__init__(model) + self.hype_module = HyPEOptimizer(model) + self.evaluator = evaluator + self.feedback_module = FeedbackModule(model) + self.n_iterations = n_iterations + self.patience = patience + self.n_candidates = n_candidates + self.top_n_candidates = top_n_candidates + self.k_samples = k_samples + self.mini_batch_size = mini_batch_size + + def _get_variants_from_best( + self, best_prompt: str, n_candidates: int + ) -> List[str]: + paraphrase_prompt = f"""Generate an alternative version of the following prompt. The new version must: +- Use different words, sentence structure, and tone (e.g., more formal, casual, or creative). +- Preserve the original meaning, key details, and language. +- Vary in length: slightly shorter or longer (up to 10%). +- Feel natural and coherent. +- Output only the text of the alternative prompt, without any additional commentary or formatting. + +Original prompt: +{best_prompt} + +Alternative prompt:""" + raw_result = get_model_answer_extracted( + self.model, paraphrase_prompt, n=n_candidates, temperature=0.9 + ) + return [best_prompt] + [ + self._process_model_output(r) for r in raw_result + ] + + def _process_model_output(self, output: Any) -> str: + return output if isinstance(output, str) else str(output) + + def optimize( + self, + prompt: str, + dataset_split: Tuple[ + Sequence[str], Sequence[str], Sequence[str], Sequence[str] + ], + meta_info: Optional[dict[str, Any]] = None, + ) -> str: + """Generate candidates, evaluate, update recommendations, repeat.""" + train_samples, val_samples, train_targets, val_targets = dataset_split + best_prompt = prompt + best_score = self.evaluator.evaluate( + prompt, list(val_samples), list(val_targets) + ) + patience_counter = 0 + + for iteration in tqdm( + range(self.n_iterations), desc="HyPER iterations" + ): + # 1. Generate candidates from best_prompt + candidates = self._get_variants_from_best( + best_prompt, n_candidates=self.n_candidates + ) + + if not candidates: + return best_prompt + + # 2. Mini-batch from train + samples, sample_targets = sample_mini_batch( + train_samples, train_targets, self.mini_batch_size + ) + if not samples: + continue + + # 3. Evaluate candidates on mini-batch via evaluate_detailed + results: List[EvalResultDetailed] = [ + self.evaluator.evaluate_detailed(cand, samples, sample_targets) + for cand in candidates + ] + + # 4. Pareto front + pareto_front = compute_pareto_front(candidates, results) + + # Fallback: if all candidates are in front, sort by aggregate_score + if len(pareto_front) == len( + candidates + ) and self.top_n_candidates < len(candidates): + scored = sorted( + zip(candidates, results), + key=lambda x: x[1].aggregate_score, + reverse=True, + ) + pareto_front = scored[: self.top_n_candidates] + + if not pareto_front: + continue + + # 5. Collect recommendations for all candidates from Pareto front + all_recs: List[str] = [] + for cand_prompt, res in pareto_front: + failed_sample = random.sample( + res.failed_examples, + min(self.k_samples, len(res.failed_examples)), + ) + recs = self.feedback_module.generate_recommendations( + cand_prompt, failed_sample + ) + all_recs.extend(recs) + + # Filter and update recommendations + all_recs = self.feedback_module.filter_recommendations(all_recs) + + self.hype_module.update_section("recommendations", all_recs) + + # 6. For each candidate from Pareto front + for cand_prompt, res in pareto_front: + optimized_prompt = self.hype_module.optimize( + cand_prompt, meta_info=meta_info + ) + + val_score = self.evaluator.evaluate( + optimized_prompt, list(val_samples), list(val_targets) + ) + + if val_score > best_score: + best_score = val_score + best_prompt = optimized_prompt + patience_counter = 0 + else: + patience_counter += 1 + + if self.patience and patience_counter >= self.patience: + break + + return best_prompt diff --git a/coolprompt/optimizer/hype/hyper_refine.py b/coolprompt/optimizer/hype/hyper_refine.py new file mode 100644 index 0000000..9238f6d --- /dev/null +++ b/coolprompt/optimizer/hype/hyper_refine.py @@ -0,0 +1,183 @@ +"""HyPEROptimizer: HyPE with iterative refinement via feedback.""" + +from dataclasses import dataclass, field +from typing import Any, List, Optional, Sequence, Tuple + +from coolprompt.optimizer.hype.hyper import HyPEOptimizer, Optimizer + + +# --- Structures --- + + +@dataclass +class FailedExample: + """Один неудачный пример для формирования рекомендаций. + + Отдаётся Evaluator при детальной оценке. + """ + + instance: str # инстанс из датасета + assistant_answer: str + metric_value: float # значение метрики для этого примера + ground_truth: str | int # целевой ответ + + +@dataclass +class EvalResult: + """Результат оценки кандидата на мини-батче.""" + + aggregate_score: float + failed_examples: List[FailedExample] = field(default_factory=list) + + +# --- Stubs --- + + +def sample_mini_batch( + dataset: Sequence[str], + targets: Sequence[str | int], + size: int, + seed: Optional[int] = None, +) -> Tuple[List[str], List[str | int]]: + """Сэмплирует мини-батч из датасета. + + Returns: + (samples, targets) — списки длины size (или меньше, если датасет меньше). + """ + import random + + rng = random.Random(seed) + n = min(size, len(dataset)) + indices = rng.sample(range(len(dataset)), n) + return ( + [dataset[i] for i in indices], + [targets[i] for i in indices], + ) + + +def _evaluate_candidate_stub( + prompt: str, + dataset: List[str], + targets: List[str | int], +) -> EvalResult: + """Заглушка Evaluator: оценивает кандидата на мини-батче. + + TODO: подключить coolprompt.evaluator.Evaluator. + """ + return EvalResult( + aggregate_score=0.0, + failed_examples=[ + FailedExample( + instance=dataset[i], + assistant_answer="", + metric_value=0.0, + ground_truth=targets[i], + ) + for i in range(min(3, len(dataset))) + ], + ) + + +def _feedback_module_stub( + failed_examples: List[FailedExample], + k_samples: int, +) -> List[str]: + """Заглушка FeedbackModule: по неудачным примерам выдаёт рекомендации. + + TODO: реализовать LLM-based feedback. + """ + return [f"Consider improving based on example: {fe.instance[:50]}..." for fe in failed_examples[:k_samples]] + + +def filter_recommendations(recommendations: List[str]) -> List[str]: + """Фильтрует рекомендации (заглушка). + + TODO: убрать дубликаты, нерелевантные и т.д. + """ + return list(recommendations) + + +# --- HyPEROptimizer --- + + +class HyPEROptimizer(Optimizer): + """HyPE с итеративным уточнением через рекомендации на основе оценки.""" + + def __init__( + self, + model: Any, + *, + n_candidates: int = 3, + top_n_candidates: int = 2, + k_samples: int = 3, + mini_batch_size: int = 16, + n_iterations: int = 2, + ) -> None: + super().__init__(model) + self.hype = HyPEOptimizer(model) + self.n_candidates = n_candidates + self.top_n_candidates = top_n_candidates + self.k_samples = k_samples + self.mini_batch_size = mini_batch_size + self.n_iterations = n_iterations + + def optimize( + self, + prompt: str, + dataset: Sequence[str], + targets: Sequence[str | int], + meta_info: Optional[dict[str, Any]] = None, + ) -> str: + """Генерирует кандидатов, оценивает, обновляет recommendations, повторяет.""" + hype = self.hype + best_candidate = prompt + + for iteration in range(self.n_iterations): + # 1. Генерация n_candidates + candidates: List[str] = [] + for _ in range(self.n_candidates): + candidate = hype.optimize(prompt, meta_info) + candidates.append(candidate) + + if not candidates: + return best_candidate + + # 2. Мини-батч + samples, sample_targets = sample_mini_batch( + dataset, targets, self.mini_batch_size + ) + if not samples: + best_candidate = candidates[0] + if iteration == self.n_iterations - 1: + return best_candidate + continue + + # 3. Оценка (заглушка Evaluator) + scored: List[Tuple[float, str, EvalResult]] = [] + for cand in candidates: + res = _evaluate_candidate_stub(cand, samples, sample_targets) + scored.append((res.aggregate_score, cand, res)) + + # 4. Top-k кандидатов + scored.sort(key=lambda x: x[0], reverse=True) + best_candidate = scored[0][1] + + if iteration == self.n_iterations - 1: + return best_candidate + + top = scored[: self.top_n_candidates] + + # 5. Собираем k_samples FailedExample для top + all_failed: List[FailedExample] = [] + for _, _, res in top: + for fe in res.failed_examples[: self.k_samples]: + all_failed.append(fe) + + # 6. FeedbackModule → рекомендации + recs = _feedback_module_stub(all_failed, self.k_samples) + recs = filter_recommendations(recs) + + # 7. Обновляем recommendations в мета-промпте hype + hype.update_section("recommendations", recs) + + return best_candidate diff --git a/coolprompt/prompt_assistant/test.py b/coolprompt/prompt_assistant/test.py new file mode 100644 index 0000000..8395d49 --- /dev/null +++ b/coolprompt/prompt_assistant/test.py @@ -0,0 +1,23 @@ +from pathlib import Path +import sys + +from langchain_openai import ChatOpenAI + +path_proj = str(Path(__file__).resolve().parent.parent.parent) +print(path_proj) +sys.path.append(path_proj) +from coolprompt.assistant import PromptTuner + +llm = ChatOpenAI( + model="gpt-3.5-turbo", + openai_api_key="", + temperature=0.7, + max_tokens=4000, + timeout=60, + max_retries=2, + # rate_limiter=rate_limiter +) +start_prompt = "а как мне стать лучшей версией себя" +final_prompt = PromptTuner(llm).run(start_prompt) +# assistant = PromptAssistant(llm) +# print(assistant.get_feedback(start_prompt, final_prompt)) diff --git a/coolprompt/task_detector/detector.py b/coolprompt/task_detector/detector.py index eefa330..94565e1 100644 --- a/coolprompt/task_detector/detector.py +++ b/coolprompt/task_detector/detector.py @@ -6,10 +6,10 @@ from pydantic import BaseModel from coolprompt.task_detector.pydantic_formatters import ( - TaskDetectionStructuredOutputSchema + TaskDetectionStructuredOutputSchema, ) from coolprompt.utils.prompt_templates.task_detector_templates import ( - TASK_DETECTOR_TEMPLATE + TASK_DETECTOR_TEMPLATE, ) from coolprompt.utils.logging_config import logger from coolprompt.utils.parsing import extract_json @@ -42,11 +42,11 @@ def _generate( Returns: Any: generated data """ - if hasattr(self.model, 'model'): + if hasattr(self.model, "model"): wrapped_model = self.model.model else: wrapped_model = self.model - + if not isinstance(wrapped_model, BaseChatModel): output = self.model.invoke(request) if isinstance(output, AIMessage): @@ -81,18 +81,12 @@ def generate( schema = TaskDetectionStructuredOutputSchema request = TASK_DETECTOR_TEMPLATE - request = request.format( - query=prompt - ) + request = request.format(query=prompt) - logger.info( - "Detecting the task by query" - ) + logger.info("Detecting the task by query") task = self._generate(request, schema, "task") - logger.info( - f"Task defined as {task}" - ) + logger.info(f"Task defined as {task}") return task diff --git a/coolprompt/utils/arithmetics.py b/coolprompt/utils/arithmetics.py index e8855ee..afd9ee2 100644 --- a/coolprompt/utils/arithmetics.py +++ b/coolprompt/utils/arithmetics.py @@ -14,4 +14,7 @@ def mean(lst): def extract_number_from_text(text): - return re.findall(r'-?\d+(?:\.\d+)?', text)[-1] + try: + return re.findall(r"-?\d+(?:\.\d+)?", text)[-1] + except: + return "" diff --git a/coolprompt/utils/enums.py b/coolprompt/utils/enums.py index 1492647..1ff74f1 100644 --- a/coolprompt/utils/enums.py +++ b/coolprompt/utils/enums.py @@ -1,23 +1,24 @@ -from enum import Enum - - -class Method(Enum): - HYPE = "hype" - REFLECTIVE = "reflective" - DISTILL = "distill" - - def is_data_driven(self) -> bool: - if self is Method.HYPE: - return False - return True - - def __str__(self): - return self.value - - -class Task(Enum): - CLASSIFICATION = "classification" - GENERATION = "generation" - - def __str__(self): - return self.value +from enum import Enum + + +class Method(Enum): + HYPE = "hype" + HYPER = "hyper" + REFLECTIVE = "reflective" + DISTILL = "distill" + + def is_data_driven(self) -> bool: + if self is Method.HYPE: + return False + return True + + def __str__(self): + return self.value + + +class Task(Enum): + CLASSIFICATION = "classification" + GENERATION = "generation" + + def __str__(self): + return self.value diff --git a/coolprompt/utils/parsing.py b/coolprompt/utils/parsing.py index ebec72e..515c3ee 100644 --- a/coolprompt/utils/parsing.py +++ b/coolprompt/utils/parsing.py @@ -1,7 +1,7 @@ from dirtyjson import DirtyJSONLoader from typing import Tuple + from langchain_core.language_models.base import BaseLanguageModel -from langchain_core.messages.ai import AIMessage def extract_answer( @@ -55,13 +55,13 @@ def safe_template(template: str, **kwargs) -> str: return template.format(**escaped) -def extract_json(text: str) -> dict | None: - """Extracts the first valid JSON with one text value from the `text`. +def extract_json(text: str) -> dict | list | None: + """Extracts the first valid JSON (object or array) from the text. Args: - text (str): text with JSON-lke substrings. + text (str): text with JSON-like substrings. Returns: - result (dict | None): dict from JSON or None + result (dict | list | None): dict or list from JSON or None (if no valid JSON substrings found). """ @@ -72,13 +72,30 @@ def extract_json(text: str) -> dict | None: pos = 0 while pos < len(text): + # Find both { and [ start_pos = text.find("{", pos) - if start_pos == -1: + bracket_pos = text.find("[", pos) + + # Get earliest position + if start_pos == -1 and bracket_pos == -1: break + elif start_pos == -1: + search_pos = bracket_pos + elif bracket_pos == -1: + search_pos = start_pos + else: + search_pos = min(start_pos, bracket_pos) + try: - return dict(loader.decode(start_index=start_pos)) - except: - pos = start_pos + 1 + result = loader.decode(start_index=search_pos) + if isinstance(result, dict): + return dict(result) + elif isinstance(result, list): + return list(result) + except Exception: + pass + + pos = search_pos + 1 return None @@ -118,21 +135,46 @@ def parse_assistant_response(answer: str) -> str: return answer.strip() -def get_model_answer_extracted(llm: BaseLanguageModel, prompt: str) -> str: - """Gets `llm`'s response for the `prompt` and extracts the answer. - - Args: - llm (BaseLanguageModel): LangChain language model. - prompt (str): prompt for the model. - Returns: - str: extracted answer or empty string if there is no final answer. - """ +from typing import Tuple - answer = llm.invoke(prompt) - if isinstance(answer, AIMessage): - answer = answer.content +def get_model_answer_extracted( + llm: BaseLanguageModel, + prompt: str, + n: int = 1, + temperature=None, +): + if temperature is not None: + llm = llm.bind(temperature=temperature) - answer = parse_assistant_response(answer) + if n == 1: + resp = llm.invoke(prompt) + text = resp.content if hasattr(resp, "content") else str(resp) + return parse_assistant_response(text) - return answer + if hasattr(llm, "generate"): + try: + llm_n = llm.bind(n=n) + result = llm_n.generate([prompt]) + gens = result.generations[0] + + outputs = [] + for g in gens: + text = getattr(g, "text", str(g)) + outputs.append(parse_assistant_response(text)) + + if len(outputs) >= n: + return outputs[:n] + except Exception: + pass + + duplicated = [prompt] * n + responses = llm.batch(duplicated) + + outputs = [] + for r in responses: + text = r.content if hasattr(r, "content") else str(r) + outputs.append(parse_assistant_response(text)) + outputs = list(dict.fromkeys(outputs)) # hard deduplication + + return outputs diff --git a/coolprompt/utils/prompt_templates/hype_templates.py b/coolprompt/utils/prompt_templates/hype_templates.py deleted file mode 100644 index fbc09cc..0000000 --- a/coolprompt/utils/prompt_templates/hype_templates.py +++ /dev/null @@ -1,51 +0,0 @@ -HYPE_PROMPT_TEMPLATE = ( - "You are an expert prompt engineer. Your only task is to " - "generate a hypothetical instructive prompt that would help " - "a large language model effectively answer the following query. " - "The prompt must solve the same underlying task as the original query while being more effective.\n" - "### HARD CONSTRAINTS ###\n" - "1. LANGUAGE:\n" - " - Output MUST be in the EXACT SAME LANGUAGE as the query.\n" - "2. CONTENT:\n" - " - Output ONLY the hypothetical instructive prompt - do NOT answer the original query directly.\n" - " - The hypothetical prompt must solve the same task as the original query provided by user.\n" - " - If the original query contains any code snippets, you must include it in final prompt.\n" - "3. TECHNICAL PRESERVATION:\n" - " - Code blocks must be preserved with original syntax and formatting.\n" - " - Variables, placeholders ({{var}}), and technical terms kept unchanged.\n" - " - Markdown and special formatting replicated precisely.\n" - "### YOUR OUTPUT FORMAT ###\n" - "[PROMPT_START][PROMPT_END]\n" - "### INPUT ###\n" - "User's query: {QUERY}\n" - "Problem description: {PROBLEM_DESCRIPTION}\n" - "### OUTPUT ###\n" - "Hypothetical Instructive Prompt: " -) - -CLASSIFICATION_TASK_TEMPLATE_HYPE = """{PROMPT} - -Answer using exactly one label from [{LABELS}]. -Generate the final answer bracketed with and . -Examples: -1. Labels are [(A), (B), (C)] and you chose the first option - Output will be: (A) -2. Labels are [A, B, C] and you chose the first option - Output will be: A - -Input: -{INPUT} - -Response: -""" - -GENERATION_TASK_TEMPLATE_HYPE = """{PROMPT} - -Provide a direct answer without additional explanations or commentary. -Generate the final answer bracketed with and . - -INPUT: -{INPUT} - -RESPONSE: -""" diff --git a/coolprompt/utils/prompt_templates/hyper_templates.py b/coolprompt/utils/prompt_templates/hyper_templates.py new file mode 100644 index 0000000..7839019 --- /dev/null +++ b/coolprompt/utils/prompt_templates/hyper_templates.py @@ -0,0 +1,259 @@ +from dataclasses import dataclass, field +from typing import List, Optional + + +TARGET_PROMPT_FORMS = ["hypothetical ", "instructional "] + + +SIMPLE_HYPOTHETICAL_PROMPT = "Write a {target_prompt_form}prompt that will solve the user query effectively." + +META_INFO_SECTION = "Task-related meta-information:\n\n{meta_info_content}\n\n" + +META_PROMPT_SECTIONS = ( + "role", + "prompt_structure", + "recommendations", + "constraints", + "output_format", +) + + +@dataclass +class PromptSectionSpec: + name: str + description: str + + +@dataclass +class HypeMetaPromptConfig: + target_prompt_form: str = "hypothetical instructional " + require_markdown_prompt: bool = True + include_role: bool = True + section_names: List[str] = field( + default_factory=lambda: [ + "Role", + "Task context", + "Instructions", + "Output requirements", + ] + ) + section_specs: List[PromptSectionSpec] = field( + default_factory=lambda: [ + PromptSectionSpec( + name="Role", + description=( + "Briefly define the assistant's role and expertise " + "relevant to the user query." + ), + ), + PromptSectionSpec( + name="Task context", + description=( + "Summarize the user's query and any provided meta-information, " + "keeping all important constraints and domain details." + ), + ), + PromptSectionSpec( + name="Instructions", + description=( + "Main part - instructions the assistant must follow " + "to solve the user's query while respecting constraints." + ), + ), + PromptSectionSpec( + name="Output requirements", + description=( + "Clearly specify the desired tone " + "and the required level of detail for the assistant's answer. " + "If the user explicitly requests a particular output format or provides " + "an example response, restate that format and include the example verbatim, " + "without inventing any additional formatting or examples. Do not introduce any output format or examples that the user did not mention." + ), + ), + ] + ) + constraints: List[str] = field( + default_factory=lambda: [ + "Preserve the language of the user's query.", + "Preserve all code snippets, inline code, technical terms and special formatting.", + "Do not remove or alter any explicit formatting instructions from the user.", + "Do not change numerical values, units, or identifiers.", + ] + ) + recommendations: List[str] = field(default_factory=list) + output_format_section: Optional[str] = None + _cached_sections: dict = field(default_factory=dict, repr=False) + + +class HypeMetaPromptBuilder: + ROLE_LINE = "You are an expert prompt engineer.\n" + TASK_SECTION_TEMPLATE = ( + "Your only task is to write a {target_prompt_form}prompt that will " + "solve the user query as effectively as possible.\n" + "Do not answer the user query directly; only produce the new prompt.\n\n" + ) + + PROMPT_STRUCTURE_SECTION_TEMPLATE = ( + "### STRUCTURE OF THE PROMPT YOU MUST PRODUCE\n" + "The prompt you write MUST be structured into the following sections, " + "in this exact order, and each section must follow its guidelines:\n" + "{sections_with_guidelines}\n\n" + ) + + CONSTRAINTS_SECTION_TEMPLATE = ( + "### HARD CONSTRAINTS\n{constraints_list}\n\n" + ) + + RECOMMENDATIONS_SECTION_TEMPLATE = ( + "### RECOMMENDATIONS\n" + "Use these recommendations for writing the new prompt, " + "based on analysis of previous generations:\n" + "{recommendations_list}\n\n" + ) + + BASE_OUTPUT_FORMAT_SECTION = ( + "### YOUR RESPONSE FORMAT\n" + "Return ONLY the resulting prompt, wrapped in the following XML tags:\n" + "\n" + " ...your resulting prompt here...\n" + "\n" + "Do not include any explanations or additional text outside this XML element.\n\n" + ) + + MARKDOWN_OUTPUT_REQUIREMENTS = ( + "#### Markdown formatting for the resulting prompt\n" + "- Write the entire prompt inside using valid Markdown.\n" + "- Use headings (e.g., `#`, `##`) for major sections of the prompt.\n" + "- Use bulleted lists (e.g., `-` or `*`) for enumerations and checklists.\n" + "- Preserve any code or pseudo-code using fenced code blocks (``` ... ```).\n" + "- Do not introduce any additional formatting beyond what is necessary to make " + "the prompt clear and well-structured." + ) + + HYPE_META_PROMPT_TEMPLATE = ( + "{role_section}" + "{prompt_structure_section}" + "{recommendations_section}" + "{constraints_section}" + "{output_format_section}" + ) + + def __init__(self, config: HypeMetaPromptConfig | None = None) -> None: + self.config = config or HypeMetaPromptConfig() + self._cache_all_sections() + + def _cache_all_sections(self) -> None: + self.config._cached_sections = { + "role": self.build_role_section(), + "prompt_structure": self.build_prompt_structure_section(), + "output_format": self.build_output_format_section(), + } + + def get_cached_section(self, name: str) -> Optional[str]: + return self.config._cached_sections.get(name) + + # ----- секция роли ----- + def build_role_section(self, include_role: bool | None = None) -> str: + include_role = ( + include_role + if include_role is not None + else self.config.include_role + ) + form = self.config.target_prompt_form or "" + task_part = self.TASK_SECTION_TEMPLATE.format(target_prompt_form=form) + if include_role: + return self.ROLE_LINE + task_part + return task_part + + # ----- секция формата (список имён секций) ----- + def build_prompt_structure_section( + self, + specs: list[PromptSectionSpec] | None = None, + ) -> str: + specs = specs or self.config.section_specs + lines = [f"- [{spec.name}] {spec.description}" for spec in specs] + return self.PROMPT_STRUCTURE_SECTION_TEMPLATE.format( + sections_with_guidelines="\n".join(lines) + ) + + # ----- секция рекомендаций (на основе анализа предыдущих генераций) ----- + def build_recommendations_section( + self, + recommendations: List[str] | None = None, + ) -> str: + recs = ( + recommendations + if recommendations is not None + else self.config.recommendations + ) + if not recs: + return "" + lines = "\n".join(f"- {r}" for r in recs) + return self.RECOMMENDATIONS_SECTION_TEMPLATE.format( + recommendations_list=lines + ) + + # ----- секция жёстких ограничений ----- + def build_constraints_section( + self, + constraints: List[str] | None = None, + ) -> str: + constraints = constraints or self.config.constraints + if not constraints: + return "" + lines = "\n".join(f"- {c}" for c in constraints) + return self.CONSTRAINTS_SECTION_TEMPLATE.format(constraints_list=lines) + + def build_output_format_section(self) -> str: + # если в конфиге уже передан кастомный текст — используем его как базу + section = ( + self.config.output_format_section + or self.BASE_OUTPUT_FORMAT_SECTION + ) + if self.config.require_markdown_prompt: + section = section + self.MARKDOWN_OUTPUT_REQUIREMENTS + return section + + # ----- сборка всего мета‑промпта ----- + def build_meta_prompt( + self, + *, + target_prompt_form: str | None = None, + section_specs: List[PromptSectionSpec] | None = None, + recommendations: List[str] | None = None, + constraints: List[str] | None = None, + output_format_section: str | None = None, + include_role: bool | None = None, + ) -> str: + # локальный override конфигов + if target_prompt_form is not None: + self.config.target_prompt_form = target_prompt_form + if section_specs is not None: + self.config.section_specs = section_specs + if recommendations is not None: + self.config.recommendations = recommendations + if constraints is not None: + self.config.constraints = constraints + if output_format_section is not None: + self.config.output_format_section = output_format_section + if include_role is not None: + self.config.include_role = include_role + + role_section = self.build_role_section(include_role=include_role) + prompt_structure_section = self.build_prompt_structure_section() + recommendations_section = self.build_recommendations_section( + recommendations=recommendations + ) + constraints_section = self.build_constraints_section() + output_format_section = self.build_output_format_section() + + return self.HYPE_META_PROMPT_TEMPLATE.format( + role_section=role_section, + prompt_structure_section=prompt_structure_section, + recommendations_section=recommendations_section, + constraints_section=constraints_section, + output_format_section=output_format_section, + ) + + def rebuild_all_sections(self) -> None: + self._cache_all_sections() diff --git a/src/prompts_scoring/prompts_scoring_example.ipynb b/src/prompts_scoring/prompts_scoring_example.ipynb new file mode 100644 index 0000000..e69de29 diff --git a/src/solutions/HyPE/ablation/generate_prompts.py b/src/solutions/HyPE/ablation/generate_prompts.py new file mode 100644 index 0000000..e69de29 diff --git a/src/solutions/HyPE/ablation/inference.py b/src/solutions/HyPE/ablation/inference.py new file mode 100644 index 0000000..9a9a635 --- /dev/null +++ b/src/solutions/HyPE/ablation/inference.py @@ -0,0 +1,157 @@ +import itertools +import json +import sys +from pathlib import Path +from datetime import datetime +from typing import List + +project_path = str(Path(__file__).resolve().parent.parent.parent.parent.parent) +sys.path.insert(0, project_path) + +from coolprompt.utils.prompt_templates.hyper_templates import ( + HypeMetaPromptBuilder, + PromptSectionSpec, +) + +# Твой HypeMetaPromptBuilder + HypeMetaPromptConfig вставляем сюда + + +def generate_sections_config( + include_role_section: bool, include_output_section: bool +) -> List[PromptSectionSpec]: + """Генерирует конфиг секций по флагам""" + base_sections = [ + PromptSectionSpec( + name="Instructions", + description=( + "Main part - instructions the assistant must follow " + "to solve the user's query while respecting constraints." + ), + ), + ] + + if include_role_section: + base_sections.insert( + 0, + PromptSectionSpec( + name="Role", + description=( + "Briefly define the assistant's role and expertise " + "relevant to the user query." + ), + ), + ) + + if include_output_section: + base_sections.append( + PromptSectionSpec( + name="Output requirements", + description=( + "Clearly specify the desired tone and required level of detail. " + "If the user explicitly requests a particular output format or " + "provides an example response, restate that format and include " + "the example verbatim, without inventing any additional formatting." + ), + ) + ) + + return base_sections + + +def _make_variant_name( + target_form: str, + include_role: bool, + role_section: bool, + output_section: bool, + use_markdown: bool, +) -> str: + """Имя варианта: TF_R_RS_OS_MD""" + tf = "hyp_inst" if "instructional" in target_form else "hyp" + return f"TF{tf}_R{int(include_role)}_RS{int(role_section)}_OS{int(output_section)}_MD{int(use_markdown)}" + + +def main_32variants(): + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + out_dir = Path("ablation_prompts") + out_dir.mkdir(exist_ok=True) + json_file = out_dir / f"meta_prompts_32v_{timestamp}.json" + + builder = HypeMetaPromptBuilder() + + # 5 факторов × 2 уровня = 32 + factors = [ + ["", "instructional ", "hypothetical instructional "], + [True, False], # include_role + [True, False], # role_section + [True, False], # output_requirements_section + [False], # markdown + ] + + total_variants = 32 + print(f"🚀 Генерируем 32 варианта мета-промптов → {json_file}") + + prompts: dict[str, str] = {} + + for combo in itertools.product(*factors): + ( + target_form, + include_role, + role_section, + output_section, + use_markdown, + ) = combo + + # Генерируем секции по флагам + specs = generate_sections_config(role_section, output_section) + + # Включаем markdown + orig_markdown = builder.config.require_markdown_prompt + builder.config.require_markdown_prompt = use_markdown + + # Строим промпт (constraints пока отключены) + meta_prompt = builder.build_meta_prompt( + target_prompt_form=target_form, + section_specs=specs, + constraints=[], + include_role=include_role, + ) + + name = _make_variant_name( + target_form, + include_role, + role_section, + output_section, + use_markdown, + ) + prompts[name] = meta_prompt + + print(f"✅ {name}") + builder.config.require_markdown_prompt = orig_markdown + + payload = { + "meta": { + "timestamp": timestamp, + "total_variants": total_variants, + "factors": [ + "target_form", + "include_role", + "role_section", + "output_section", + "markdown", + ], + "naming": "TF{hyp|hyp_inst}_R{0|1}_RS{0|1}_OS{0|1}_MD{0|1}", + }, + "prompts": prompts, + } + + with open(json_file, "w", encoding="utf-8") as f: + json.dump(payload, f, ensure_ascii=False, indent=2) + + print(f"\n🎉 Готово! 32 варианта в {json_file}") + print( + f"📊 Naming: TF{{hyp|hyp_inst}}_R{{0|1}}_RS{{0|1}}_OS{{0|1}}_MD{{0|1}}" + ) + + +if __name__ == "__main__": + main_32variants() diff --git a/src/solutions/HyPE/config_dict.py b/src/solutions/HyPE/config_dict.py index c05ec5d..cf824e6 100644 --- a/src/solutions/HyPE/config_dict.py +++ b/src/solutions/HyPE/config_dict.py @@ -5,23 +5,14 @@ gsm8k_preproc, common_gen, common_gen_preproc, - ag_news, - ag_news_preproc, + tweeteval, + tweeteval_preproc, xsum, xsum_preproc, ) config_dict = { - "squad_v2": { - "start_prompt": "Given a context answer on the question.", - "task": "generation", - "metric": "bertscore", - "preproc": squad_v2_preproc, - "data": squad_v2, - "test_name": "validation", - "problem_description": "question answering", - }, "gsm8k": { "start_prompt": "Given a context answer on the question.", "task": "generation", @@ -31,6 +22,15 @@ "test_name": "test", "problem_description": "math solving", }, + "squad_v2": { + "start_prompt": "Given a context answer on the question.", + "task": "generation", + "metric": "bertscore", + "preproc": squad_v2_preproc, + "data": squad_v2, + "test_name": "validation", + "problem_description": "question answering", + }, "common_gen": { "start_prompt": "Create a short sentence using words in list.", "task": "generation", @@ -40,12 +40,12 @@ "test_name": "validation", "problem_description": "create a sentence", }, - "ag_news": { - "start_prompt": "Classify news and provide number of topic from dict {{World: 0, Sports: 1, Business: 2, Sci/Tech: 3}}", + "tweeteval": { + "start_prompt": "Provide sentiment classification.", "task": "classification", "metric": "f1", - "preproc": ag_news_preproc, - "data": ag_news, + "preproc": tweeteval_preproc, + "data": tweeteval, "test_name": "test", "problem_description": "classification", }, diff --git a/src/solutions/HyPE/hype_test.py b/src/solutions/HyPE/hype_test.py index 697ca58..fd78864 100644 --- a/src/solutions/HyPE/hype_test.py +++ b/src/solutions/HyPE/hype_test.py @@ -1,93 +1,100 @@ -import random +import os import sys from typing import Any from pathlib import Path import json +import numpy as np + +from langchain_openai import ChatOpenAI +from langchain_core.rate_limiters import InMemoryRateLimiter import pandas as pd -from sklearn.model_selection import train_test_split project_path = str(Path(__file__).resolve().parent.parent.parent.parent) print(project_path) sys.path.append(project_path) from config_dict import config_dict -from src.utils.load_dataset_coolprompt import ag_labels +from src.utils.load_dataset_coolprompt import tweeteval_emotions from coolprompt.assistant import PromptTuner -from coolprompt.language_model.llm import DefaultLLM -llm = DefaultLLM.init() +# llm = DefaultLLM.init(vllm_engine_config={"gpu_memory_utilization": 0.95}) +rate_limiter = InMemoryRateLimiter( + requests_per_second=1, check_every_n_seconds=0.1, max_bucket_size=10 +) +model = "gpt-4o-mini" +llm = ChatOpenAI( + model=model, + temperature=0.7, + max_completion_tokens=4000, + max_retries=5, + rate_limiter=rate_limiter, + api_key="", + extra_body={ + "allowed_providers": ["google-vertex", "azure"], + }, + base_url="https://openrouter.ai/api/v1", +) pt = PromptTuner(llm) -def manage_ag_news(data: pd.DataFrame, max_imbalance: float = 0.6): - if set(data["target"].unique()).issubset(set(ag_labels)): - class_proportions = data["target"].value_counts(normalize=True) - if class_proportions.max() > max_imbalance: - return None - else: - return data - - def sample( data: pd.DataFrame, sample_size: int = None, seed: int = 42, ) -> pd.DataFrame: - if sample_size is not None: - if set(data["target"].unique()).issubset(set(ag_labels)): - _, data_sample = train_test_split( - data, - train_size=sample_size, - stratify=data["target"], - random_state=seed, - ) - else: - rng = random.Random(seed) - - total_size = len(data) - n = min(sample_size, total_size) - - indices = rng.sample(range(total_size), n) + np.random.seed(seed) + if sample_size is None: + return data - data_sample = data.iloc[indices] + if set(data["target"].unique()).issubset(set(tweeteval_emotions)): + min_class_size = data["target"].value_counts().min() + per_class = min(sample_size // len(tweeteval_emotions), min_class_size) - return data_sample - return data + balanced_parts = [ + df.sample(per_class, random_state=seed) for _, df in data.groupby("target") + ] + return pd.concat(balanced_parts).reset_index(drop=True) + else: + return data.sample(sample_size, random_state=seed) def run_hype_dataset() -> dict[str, Any]: - result = {} + result = {"model": model} for task, cfg in config_dict.items(): - data_train, data_val = cfg["data"]["train"], cfg["data"]["validation"] - preproc_data = cfg["preproc"](data_val) - data_sample = sample(preproc_data, sample_size=100) - dataset, target = list(data_sample["input_data"]), list( - data_sample["target"] - ) - - final_prompt = pt.run( - cfg["start_prompt"], - cfg["task"], - dataset, - target, - "hype", - cfg["metric"], - cfg["problem_description"], - verbose=2, - train_as_test=True, - sample_answers=True, + data_train, data_val = ( + cfg["data"]["train"], + cfg["data"][cfg["test_name"]], ) + preproc_data = cfg["preproc"](data_val) + data_sample = sample(preproc_data, sample_size=10) + dataset, target = list(data_sample["input_data"]), list(data_sample["target"]) + + try: + final_prompt = pt.run( + cfg["start_prompt"], + cfg["task"], + dataset, + target, + "hyper", + cfg["metric"], + cfg["problem_description"], + verbose=2, + train_as_test=True, + feedback=False, + ) - result[task] = { - "metric": { - "name": cfg["metric"], - "start_score": pt.init_metric, - "final_metric": pt.final_metric, - }, - "prompt": final_prompt, - "samples": pt.answer_samples, - } + result[task] = { + "metric": { + "name": cfg["metric"], + "start_score": pt.init_metric, + "final_metric": pt.final_metric, + }, + "prompt": final_prompt, + } + except Exception as e: + print(f"!!!!EXCEPTION: {str(e)}!!!!") + result[task] = {"exception": str(e)} return result @@ -95,11 +102,13 @@ def run_hype_dataset() -> dict[str, Any]: def test(path: str | Path) -> None: with open(path, "w") as f: result = run_hype_dataset() + print("Saving to", os.path.abspath(path)) json.dump(result, f) + print(f"Successfully wrote to {path}") def main(): - test("./logs/test_1.json") + test("./logs/result.json") if __name__ == "__main__": diff --git a/src/solutions/HyPE/llm.py b/src/solutions/HyPE/llm.py new file mode 100644 index 0000000..25372b3 --- /dev/null +++ b/src/solutions/HyPE/llm.py @@ -0,0 +1,80 @@ +from langchain_community.callbacks.manager import get_openai_callback +from langchain_core.language_models.base import BaseLanguageModel + + +class TrackedLLMWrapper: + """Простая обертка вокруг ChatOpenAI с трекингом""" + + def __init__(self, model, tracker): + self.model = model + self.tracker = tracker + + @property + def __class__(self): + return BaseLanguageModel + + def invoke(self, input, **kwargs): + with get_openai_callback() as cb: + result = self.model.invoke(input, **kwargs) + self.tracker._update_stats(cb, True) + return result + + def batch(self, inputs, **kwargs): + with get_openai_callback() as cb: + results = self.model.batch(inputs, **kwargs) + self.tracker._update_stats(cb, False, batch_size=len(inputs)) + return results + + def reset_stats(self): + self.tracker.reset_stats() + + def get_stats(self): + return self.tracker.get_stats() + + # Проксируем остальные методы + def __getattr__(self, name): + return getattr(self.model, name) + + +class OpenAITracker: + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._reset_stats() + return cls._instance + + def _reset_stats(self): + self.stats = { + "total_calls": 0, + "total_tokens": 0, + "prompt_tokens": 0, + "completion_tokens": 0, + "total_cost": 0.0, + "invoke_calls": 0, + "batch_calls": 0, + "batch_items": 0, + } + + def _update_stats(self, callback, invoke_flag, **kwargs): + self.stats["total_calls"] += 1 + self.stats["total_tokens"] += callback.total_tokens + self.stats["prompt_tokens"] += callback.prompt_tokens + self.stats["completion_tokens"] += callback.completion_tokens + self.stats["total_cost"] += callback.total_cost + if invoke_flag: + self.stats["invoke_calls"] += 1 + else: + self.stats["batch_calls"] += 1 + self.stats["batch_items"] += kwargs.get("batch_size", 0) + + def wrap_model(self, model): + """Обертывает модель для трекинга""" + return TrackedLLMWrapper(model, self) + + def get_stats(self): + return self.stats.copy() + + def reset_stats(self): + self._reset_stats() diff --git a/src/utils/load_dataset_coolprompt.py b/src/utils/load_dataset_coolprompt.py index 99250f9..759df6d 100644 --- a/src/utils/load_dataset_coolprompt.py +++ b/src/utils/load_dataset_coolprompt.py @@ -4,15 +4,10 @@ squad_v2 = load_dataset("rajpurkar/squad_v2") gsm8k = load_dataset("openai/gsm8k", "main") common_gen = load_dataset("allenai/common_gen") -ag_news = load_dataset("fancyzhx/ag_news") +tweeteval = load_dataset("cardiffnlp/tweet_eval", "emotion") xsum = load_dataset("yairfeldman/xsum") -ag_labels = { - "World": 0, - "Sports": 1, - "Business": 2, - "Sci/Tech": 3, -} +tweeteval_emotions = {0: "anger", 1: "joy", 2: "optimism", 3: "sadness"} def squad_v2_preproc(sample, size: int = None): @@ -54,10 +49,12 @@ def common_gen_preproc(sample, size: int = None): return data -def ag_news_preproc(sample, size: int = None): +def tweeteval_preproc(sample, size: int = None): data = pd.DataFrame(sample) - data = data.rename(columns={"text": "input_data", "label": "target"}) + data["input_data"] = data["text"] + data["target"] = data["label"].apply(lambda x: tweeteval_emotions[x]) + if size: data = data.head(size) @@ -83,8 +80,8 @@ def get_data(): return gsm8k_preproc(gsm8k, size) case "common_gen": return common_gen_preproc(common_gen, size) - case "ag_new": - return ag_news_preproc(ag_news, size) + case "tweeteval": + return tweeteval_preproc(tweeteval, size) case "xsum": return xsum_preproc(xsum, size)