Source code for twon_lss.utility.llm
from abc import abstractmethod
import logging
import requests
import typing
import time
import pydantic
class Message(pydantic.BaseModel):
role: typing.Literal["system", "user", "assistant"]
content: str
class Chat(pydantic.RootModel):
root: typing.List[Message]
# LLM Implementation
[docs]
class LLM(pydantic.BaseModel):
api_key: str
model: str = "Qwen/Qwen3-4B-Instruct-2507:nscale"
url: str = "https://router.huggingface.co/v1/chat/completions"
def _query(self, payload):
headers: dict = {"Authorization": f"Bearer {self.api_key}"}
response = requests.post(self.url, headers=headers, json=payload)
return response.json()
def generate(self, chat: Chat, max_retries: int = 5) -> str:
try:
response: str = self._query(
{
"messages": chat.model_dump(),
"model": self.model,
}
)["choices"][0]["message"]["content"]
except Exception as e:
logging.error(f"Failed to query LLM: {e}")
if max_retries > 0:
time.sleep(60)
return self.generate(chat, max_retries - 1)
raise RuntimeError("Failed to generate response from LLM after retries") from e
return response
# Embedding Model Interface and Implementations
class EmbeddingModelInterface(pydantic.BaseModel):
@abstractmethod
def extract(self, text: typing.Optional[typing.Union[str, list]], max_retries: int = 3):
pass
class LocalEmbeddingModel(EmbeddingModelInterface):
model_name: str = "mixedbread-ai/mxbai-embed-large-v1"
batch_size: int = 16
model: typing.Any = None
def model_post_init(self, context):
from sentence_transformers import SentenceTransformer
self.model = SentenceTransformer("mixedbread-ai/mxbai-embed-large-v1")
def extract(self, text: typing.Optional[typing.Union[str, list]]):
if isinstance(text, str):
return self.model.encode([text])[0].tolist()
elif isinstance(text, list):
embeddings = []
for i in range(0, len(text), self.batch_size):
batch = text[i:i + self.batch_size]
emb_batch = self.model.encode(batch)
embeddings.extend([emb.tolist() for emb in emb_batch])
return embeddings
class APIEmbeddingModel(EmbeddingModelInterface):
api_key: str
model: str = "Qwen/Qwen3-4B-Instruct-2507:nscale"
url: str = "https://router.huggingface.co/v1/chat/completions"
def _query(self, payload):
headers: dict = {"Authorization": f"Bearer {self.api_key}"}
response = requests.post(self.url, headers=headers, json=payload)
return response.json()
def extract(self, text: typing.Optional[typing.Union[str, list]], max_retries: int = 3):
"""
Returns embeddings for either text or list of texts.
"""
if self.url == "https://router.huggingface.co/v1/chat/completions":
raise ValueError("Extract endpoint not supported for chat completions API. Use HF-Inference URL that includs endpoint and model for extract")
if len(text) < 100:
try:
return self._query({
"inputs": text,
})
except Exception as e:
logging.error(f"Failed to extract embeddings: {e}")
if max_retries > 0:
time.sleep(5)
return self.extract(text, max_retries - 1)
raise RuntimeError("Failed to extract embeddings after retries") from e
else:
# Chunking for long texts
CHUNK_SIZE = 100
chunks = [text[i:i + CHUNK_SIZE] for i in range(0, len(text), CHUNK_SIZE)]
embeddings = []
for chunk in chunks:
try:
emb_chunk = self._query({
"inputs": chunk,
})
embeddings.extend(emb_chunk)
except Exception as e:
logging.error(f"Failed to extract embeddings for chunk: {e}")
if max_retries > 0:
time.sleep(5)
emb_chunk = self.extract(chunk, max_retries - 1)
embeddings.extend(emb_chunk)
else:
raise RuntimeError("Failed to extract embeddings after retries") from e
return embeddings