Skip to content
Regolo Logo

Orchestrating Predictable AI Agents with Parlant and Regolo

Purpose vs. Predictability

Have you ever tried to give an LLM a specific purpose, only to receive unpredictable, poorly structured, or “hallucinated” outputs? Prompt engineering can only take you so far. When building production-ready agents, you need more than just a good prompt; you need a behavioral framework.

This is the exact problem Parlant solves. By integrating Parlant with Regolo’s high-performance models, you can build agents that are both intelligent and strictly aligned with your business logic.


What is Parlant?

Parlant is an open-source alignment and conversation modeling engine. Unlike rigid flowcharts or loose system prompts, Parlant uses a structured behavioral model—a set of granular principles and objectives that guide how a model interacts in specific contexts.

Why use it?

  • Reliability: Ensures agents stay within the bounds of business rules.
  • Explainability: Provides tools to understand why an agent made a specific decision.
  • Control: Includes built-in session management and content filtering.
  • Open Source: Released under the Apache 2.0 license.

Integration: Parlant with Regolo

To use Regolo as your custom AI provider, you need to adapt Parlant’s Service class. This allows Parlant to communicate with Regolo’s OpenAI-compatible API while maintaining its structured behavioral checks.

Step 1: Implement the Regolo Service

Create a file named RegoloService.py. This script defines the RegoloService class, handles token estimation, and maps specific Parlant schemas to Regolo models (like DeepSeek R1 or Llama 3.3).

RegoloService.py:

Python

from __future__ import annotations
import time
from abc import ABC

from openai import (
    APIConnectionError,
    APIResponseValidationError,
    APITimeoutError,
    AsyncClient,
    ConflictError,
    InternalServerError,
    RateLimitError,
)
from typing import Any, Mapping
from typing_extensions import override
import json
import jsonfinder  # type: ignore
import os

from pydantic import ValidationError
import tiktoken

from parlant.adapters.nlp.common import normalize_json_output
from parlant.core.engines.alpha.canned_response_generator import (
    CannedResponseDraftSchema,
    CannedResponseSelectionSchema,
)
from parlant.core.engines.alpha.guideline_matching.generic.journey_node_selection_batch import (
    JourneyNodeSelectionSchema,
)
from parlant.core.engines.alpha.prompt_builder import PromptBuilder
from parlant.core.engines.alpha.tool_calling.single_tool_batch import SingleToolBatchSchema
from parlant.core.loggers import LogLevel, Logger
from parlant.core.nlp.policies import policy, retry
from parlant.core.nlp.tokenization import EstimatingTokenizer
from parlant.core.nlp.service import NLPService
from parlant.core.nlp.embedding import Embedder
from parlant.core.nlp.generation import (
    T,
    SchematicGenerator,
    SchematicGenerationResult,
)
from parlant.core.nlp.generation_info import GenerationInfo, UsageInfo
from parlant.core.nlp.moderation import ModerationService, NoModeration

RATE_LIMIT_ERROR_MESSAGE = (
    "Regolo API rate limit exceeded."
)


class RegoloEstimatingTokenizer(EstimatingTokenizer):
    def __init__(self, model_name: str) -> None:
        self.model_name = model_name
        # Use gpt-4o encoding for token estimation since Regolo uses OpenAI-compatible API
        try:
            self.encoding = tiktoken.encoding_for_model("gpt-4o")
        except KeyError:
            # Fallback to cl100k_base encoding if model not found
            self.encoding = tiktoken.get_encoding("cl100k_base")

    @override
    async def estimate_token_count(self, prompt: str) -> int:
        tokens = self.encoding.encode(prompt)
        return len(tokens)


class RegoloSchematicGenerator(SchematicGenerator[T], ABC):
    supported_openai_params = ["temperature", "logit_bias", "max_tokens"]
    supported_hints = supported_openai_params + ["strict"]

    def __init__(
            self,
            model_name: str,
            logger: Logger,
            tokenizer_model_name: str | None = None,
    ) -> None:
        self.model_name = model_name
        self._logger = logger

        self._client = AsyncClient(
            api_key=os.environ["REGOLO_API_KEY"],
            base_url="https://api.regolo.ai/v1"
        )

        self._tokenizer = RegoloEstimatingTokenizer(
            model_name=tokenizer_model_name or self.model_name
        )

    @property
    @override
    def id(self) -> str:
        return f"regolo/{self.model_name}"

    @property
    @override
    def tokenizer(self) -> RegoloEstimatingTokenizer:
        return self._tokenizer

    @policy(
        [
            retry(
                exceptions=(
                        APIConnectionError,
                        APITimeoutError,
                        ConflictError,
                        RateLimitError,
                        APIResponseValidationError,
                ),
            ),
            retry(InternalServerError, max_exceptions=2, wait_times=(1.0, 5.0)),
        ]
    )
    @override
    async def generate(
            self,
            prompt: str | PromptBuilder,
            hints: Mapping[str, Any] = {}, # noqa
    ) -> SchematicGenerationResult[T]:
        with self._logger.scope("RegoloSchematicGenerator"):
            with self._logger.operation(
                    f"LLM Request ({self.schema.__name__})", level=LogLevel.TRACE
            ):
                return await self._do_generate(prompt, hints)

    async def _do_generate(
            self,
            prompt: str | PromptBuilder,
            hints: Mapping[str, Any] = {}, # noqa
    ) -> SchematicGenerationResult[T]:
        if isinstance(prompt, PromptBuilder):
            prompt = prompt.build()

        openai_api_arguments = {k: v for k, v in hints.items() if k in self.supported_openai_params}

        if hints.get("strict", False):
            t_start = time.time()
            try:
                response = await self._client.beta.chat.completions.parse(
                    messages=[{"role": "user", "content": prompt}], # noqa
                    model=self.model_name,
                    response_format=self.schema,
                    **openai_api_arguments,
                )
            except RateLimitError:
                self._logger.error(RATE_LIMIT_ERROR_MESSAGE)
                raise

            t_end = time.time()

            if response.usage:
                self._logger.trace(response.usage.model_dump_json(indent=2))

            parsed_object = response.choices[0].message.parsed
            assert parsed_object

            assert response.usage

            # Handle cases where prompt_tokens_details might not exist
            cached_tokens = 0
            if hasattr(response.usage, 'prompt_tokens_details') and response.usage.prompt_tokens_details:
                cached_tokens = response.usage.prompt_tokens_details.cached_tokens or 0

            return SchematicGenerationResult[T](
                content=parsed_object,
                info=GenerationInfo(
                    schema_name=self.schema.__name__,
                    model=self.id,
                    duration=(t_end - t_start),
                    usage=UsageInfo(
                        input_tokens=response.usage.prompt_tokens,
                        output_tokens=response.usage.completion_tokens,
                        extra={"cached_input_tokens": cached_tokens},
                    ),
                ),
            )

        else:
            try:
                t_start = time.time()
                response = await self._client.chat.completions.create( # noqa
                    messages=[{"role": "user", "content": prompt}],
                    model=self.model_name,
                    response_format={"type": "json_object"},
                    **openai_api_arguments,
                )
                t_end = time.time()
            except RateLimitError:
                self._logger.error(RATE_LIMIT_ERROR_MESSAGE)
                raise

            if response.usage:
                self._logger.trace(response.usage.model_dump_json(indent=2))

            raw_content = response.choices[0].message.content or "{}"

            try:
                json_content = json.loads(normalize_json_output(raw_content))
            except json.JSONDecodeError:
                self._logger.warning(f"Invalid JSON returned by {self.model_name}:\n{raw_content}")
                json_content = jsonfinder.only_json(raw_content)[2]
                self._logger.warning("Found JSON content within model response; continuing...")

            try:
                content = self.schema.model_validate(json_content)

                assert response.usage

                # Handle cases where prompt_tokens_details might not exist
                cached_tokens = 0
                if hasattr(response.usage, 'prompt_tokens_details') and response.usage.prompt_tokens_details:
                    cached_tokens = response.usage.prompt_tokens_details.cached_tokens or 0

                return SchematicGenerationResult(
                    content=content,
                    info=GenerationInfo(
                        schema_name=self.schema.__name__,
                        model=self.id,
                        duration=(t_end - t_start),
                        usage=UsageInfo(
                            input_tokens=response.usage.prompt_tokens,
                            output_tokens=response.usage.completion_tokens,
                            extra={"cached_input_tokens": cached_tokens},
                        ),
                    ),
                )

            except ValidationError as e:
                self._logger.error(
                    f"Error: {e.json(indent=2)}\nJSON content returned by {self.model_name} does not match expected schema:\n{raw_content}"
                )
                raise


# Model implementations
class DeepSeekR170B(RegoloSchematicGenerator[T]):
    def __init__(self, logger: Logger) -> None:
        super().__init__(model_name="deepseek-r1-70b", logger=logger)

    @property
    @override
    def max_tokens(self) -> int:
        return 128_000


class Gemma327BIt(RegoloSchematicGenerator[T]):
    def __init__(self, logger: Logger) -> None:
        super().__init__(model_name="gemma-3-27b-it", logger=logger)

    @property
    @override
    def max_tokens(self) -> int:
        return 24_000


class GPTOss120B(RegoloSchematicGenerator[T]):
    def __init__(self, logger: Logger) -> None:
        super().__init__(model_name="gpt-oss-120b", logger=logger)

    @property
    @override
    def max_tokens(self) -> int:
        return 128_000


class Llama318BInstruct(RegoloSchematicGenerator[T]):
    def __init__(self, logger: Logger) -> None:
        super().__init__(model_name="Llama-3.1-8B-Instruct", logger=logger)

    @property
    @override
    def max_tokens(self) -> int:
        return 120_000


class Llama3370BInstruct(RegoloSchematicGenerator[T]):
    def __init__(self, logger: Logger) -> None:
        super().__init__(model_name="Llama-3.3-70B-Instruct", logger=logger)

    @property
    @override
    def max_tokens(self) -> int:
        return 16_000


class MaestraleChatV04Beta(RegoloSchematicGenerator[T]):
    def __init__(self, logger: Logger) -> None:
        super().__init__(model_name="maestrale-chat-v0.4-beta", logger=logger)

    @property
    @override
    def max_tokens(self) -> int:
        return 64_000


class MistralSmall32(RegoloSchematicGenerator[T]):
    def __init__(self, logger: Logger) -> None:
        super().__init__(model_name="mistral-small3.2", logger=logger)

    @property
    @override
    def max_tokens(self) -> int:
        return 120_000


class Qwen330B(RegoloSchematicGenerator[T]):
    def __init__(self, logger: Logger) -> None:
        super().__init__(model_name="qwen3-30b", logger=logger)

    @property
    @override
    def max_tokens(self) -> int:
        return 120_000


class Qwen3Coder30B(RegoloSchematicGenerator[T]):
    def __init__(self, logger: Logger) -> None:
        super().__init__(model_name="qwen3-coder-30b", logger=logger)

    @property
    @override
    def max_tokens(self) -> int:
        return 120_000


class Qwen38B(RegoloSchematicGenerator[T]):
    def __init__(self, logger: Logger) -> None:
        super().__init__(model_name="Qwen3-8B", logger=logger)

    @property
    @override
    def max_tokens(self) -> int:
        return 40_960


class RegoloService(NLPService):
    @staticmethod
    def verify_environment() -> str | None:
        """Returns an error message if the environment is not set up correctly."""

        if not os.environ.get("REGOLO_API_KEY"):
            return """\
You're using the Regolo NLP service, but REGOLO_API_KEY is not set.
Please set REGOLO_API_KEY in your environment before running Parlant.
"""

        return None

    def __init__(self, logger: Logger) -> None:
        self._logger = logger
        self._logger.info("Initialized RegoloService")

    @override
    async def get_schematic_generator(self, t: type[T]) -> RegoloSchematicGenerator[T]:
        return {
            SingleToolBatchSchema: DeepSeekR170B[SingleToolBatchSchema],
            JourneyNodeSelectionSchema: DeepSeekR170B[JourneyNodeSelectionSchema],
            CannedResponseDraftSchema: DeepSeekR170B[CannedResponseDraftSchema],
            CannedResponseSelectionSchema: DeepSeekR170B[CannedResponseSelectionSchema],
        }.get(t, DeepSeekR170B[t])(self._logger)  # type: ignore

    @override
    async def get_embedder(self) -> Embedder:
        # Importing here to avoid circular dependency issues
        from parlant.adapters.nlp.hugging_face import JinaAIEmbedder
        return JinaAIEmbedder()

    @override
    async def get_moderation_service(self) -> ModerationService:
        return NoModeration()


class SDKError(Exception):
    """Main class for SDK-related errors."""

    def __init__(self, message: str) -> None:
        super().__init__(message)

from lagom import Container

def regolo(container: Container) -> NLPService:
    """Creates a regolo NLPService instance using the provided container."""

    if error := RegoloService.verify_environment():
        raise SDKError(error)

    return RegoloService(container[Logger])

Step 2: Launch the Agent

Now, create a test script called serverTest.py to initialize your agent using the Regolo service you just defined.

serverTest.py:

Python

import asyncio
import parlant.sdk as p
from RegoloService import regolo
import os

# Ensure you add your API key here or set it in your environment
os.environ["REGOLO_API_KEY"] = "YOUR_REGOLO_API_KEY"

async def main():
  async with p.Server(nlp_service=regolo) as server:
    agent = await server.create_agent(
        name="Otto Carmen",
        description="You work at a car dealership",
    )
    print(f"Agent {agent.name} is ready!")

asyncio.run(main())Code language: PHP (php)

With your API key in place, simply run python serverTest.py to see your agent in action. By bridging Parlant’s behavioral guardrails with Regolo’s diverse model selection, you can move away from “hope-based” prompting and toward production-grade AI reliability.

For more advanced configurations and usage patterns, be sure to check out the Parlant Documentation.

👉 Create Predictable AI Agents with Parlant and Regolo for free


Resources & Community

Official Documentation:

Related Guides:

Join the Community:


🚀 Ready to scale?

Get Free Regolo Credits →

Built with ❤️ by the Regolo team. Questions? support@regolo.ai or chat with us on Discord

Related post you might enjoy