Skip to content



The client

We import core class and modules here

import asyncio
from enum import Enum
from functools import cache, cached_property
from pathlib import Path
from typing import Any, Callable, Coroutine, Tuple, Union

later the faust related objects

from faust import App, Worker, Record, uuid

and at last all defined types

from .types import Deal, Duel, Offer, SexDigest, YellowPage, AuctionDigest

The Client

The client is just an object composition using faust

class Client:

We use this to know what models and kafka topics are allowed and how are related.

    allowed_records = {
        Deal: "deals",
        Duel: "duels",
        Offer: "offers",
        SexDigest: "sex_digest",
        YellowPage: "yellow_pages",
        AuctionDigest: "au_digest",

and this to define versions of chat wars

    class Version(Enum):
        CW2 = "cw2"
        CW3 = "cw3"

The Client.init method can recieve

  • id: just the client id
  • version: client version
  • broker: kafka address or broker
  • loop: execution loop
  • debug: for debuging with aiomonitor
  • loglevel: the level of expecting logging info

This initialice a kafka consumer (carefull with this is just a consumer) with a provided or self generated id that consume from a provided or default (cw2) chat wars version topics. This consumer listen from a provided broker and is executed in a provided or self generated loop.

    def __init__(
        id: str = None,
        version: "Client.Version" = None,
        broker: str = "kafka://",
        work_dir: Union[str, Path] = "./.fchatwars",
        loop: asyncio.AbstractEventLoop = None,
        debug: bool = False,
        loglevel: Union[str, int] = "warning",
    ) -> None:

        self._loop = loop if loop is not None else asyncio.get_event_loop()
        self._version = version if version is not None else self.Version.CW2
        self._debug = debug
        self._logleve = loglevel

        self._app = App(
            id=uuid() if id is None else id,
        self._workdir = Path(work_dir)

The properties are straigth forward so ... no explanations are needed

    def loop(self) -> asyncio.AbstractEventLoop:
        return self._loop

    def workdir(self) -> Path:
        return self._workdir

    def version(self) -> "Client.Version":
        return self._version

    def debug(self) -> bool:
        return self._debug

    def loglevel(self) -> Union[str, int]:
        return self._logleve

This is a faust topic builder with some cache (just for internal use)

    def _topic(self, record: Record):
        topic_name = f"{self.version.value}-{self.allowed_records[record]}"
        return self._app.topic(topic_name, value_type=record)

Partial exposition of the faust agent decorator, an agent is (in this case) an asyncronous stream consumer

    def agent(self, record):
        assert record in self.allowed_records, "This is not an allowed Chat Wars Record"
        return self._app.agent(self._topic(record))

This is a very straigth forward exposition of faust timer function

    def timer(self, seconds):
        return self._app.timer(interval=seconds)

Return a function to start and a function to stop it (not for external use)

    def _driven_functions(self) -> Tuple[Callable, Callable]:

        worker = Worker(
        )  # We remove or preset not important setting when we create the worker

        worker.spinner = None  # Removing anoying spinner

        return (
            lambda: self.loop.run_until_complete(worker.start()),

This method starts the app execution loop

    def start(self):
        return self._driven_functions[0]()

This method stops the app execution loop

    def stop(self):
        return self._driven_functions[1]()

This method run the app in a fancy controlled way

    def run(self):

A method for cli use ... if you want

    def cli(self):
        return self._app.main()


I have some doubts about let rockdb faust integration or not. I´m going to let it out for now.

Now we are ready for some testing 👋