Source code for kafkastreamer.squashing

from collections.abc import Generator, Iterable
from contextlib import contextmanager
from typing import TypeAlias

from django.db.models import Model

from .constants import TYPE_CREATE, TYPE_DELETE, TYPE_UPDATE
from .context import _context
from .stream import Streamer
from .types import Message, ObjectID

SquashItem: TypeAlias = dict[ObjectID, Message]
SquashDict: TypeAlias = dict[type[Model], SquashItem]

_context.squash = None


def is_squashing() -> bool:
    return getattr(_context, "squash", None) is not None


[docs] @contextmanager def squash() -> Generator[None, None]: """ Context manager to squash messages. Within this context manager messages are not sent immediately but accumulate in the buffer. Items in buffer is grouped by object ID, so that later items override earliest to avoid unnecessary messages that represent an intermediate state. """ from .registry import get_streamer is_top = not is_squashing() if is_top: squash: SquashDict = {} _context.squash = squash try: yield finally: if is_top: for model, messages_d in _context.squash.items(): streamer = get_streamer(model) if streamer is not None: messages = messages_d.values() streamer.send_messages(messages) _context.squash = None
def add_to_squash( model: type[Model], streamer: Streamer, messages: Iterable[Message], ) -> int: assert _context.squash is not None squash: SquashDict = _context.squash squash.setdefault(model, {}) messages_d = squash[model] count = 0 for msg in messages: obj_id = msg.data[streamer.id_field] if obj_id in messages_d: prev_type = messages_d[obj_id].meta.msg_type cur_type = msg.meta.msg_type if prev_type == TYPE_CREATE and cur_type == TYPE_UPDATE: msg = msg._replace( meta=msg.meta._replace(msg_type=TYPE_CREATE), ) elif prev_type == TYPE_CREATE and cur_type == TYPE_DELETE: del messages_d[obj_id] continue messages_d[obj_id] = msg count += 1 return count