from collections.abc import Sequence
from datetime import datetime
from typing import Any
from django.db.models import Manager, Model
from django.db.models.query import QuerySet
from django.utils import timezone
from kafka import KafkaProducer # type: ignore
from .constants import TYPE_CREATE, TYPE_DELETE, TYPE_REFRESH, TYPE_UPDATE
from .registry import get_registry, get_streamer
from .squashing import add_to_squash, is_squashing
from .stream import Streamer
from .types import ObjectID, RefreshFinalizeType
[docs]
def send(
objects: Sequence[Model],
manager: Manager | None = None,
objects_ids: Sequence[ObjectID] | None = None,
msg_type: str | None = None,
timestamp: datetime | None = None,
batch_size: int | None = None,
batch_kwargs: dict[str, Any] | None = None,
producer: KafkaProducer | None = None,
flush: bool = True,
) -> int:
"""
Sends objects to the associated streamer.
"""
if manager is not None:
model = manager.model
elif isinstance(objects, (Manager, QuerySet)):
model = objects.model
else:
if not objects:
return 0
model = objects[0].__class__
streamer = get_streamer(model)
if streamer is None:
return 0
messages = streamer.get_messages_for_objects(
objects,
manager=manager,
objects_ids=objects_ids,
msg_type=msg_type,
timestamp=timestamp,
batch_size=batch_size,
batch_kwargs=batch_kwargs,
)
if is_squashing():
count = add_to_squash(model, streamer, messages)
else:
count = streamer.send_messages(
messages,
batch_size=batch_size,
producer=producer,
flush=flush,
)
return count
[docs]
def send_create(objects: Sequence[Model], **kwargs: Any) -> int:
"Alias for ``send(objects, msg_type=TYPE_DELETE, ...)``."
return send(objects, msg_type=TYPE_CREATE, **kwargs)
[docs]
def send_update(objects: Sequence[Model], **kwargs: Any) -> int:
"Alias for ``send(objects, msg_type=TYPE_UPDATE, ...)``."
return send(objects, msg_type=TYPE_UPDATE, **kwargs)
[docs]
def send_delete(objects: Sequence[Model], **kwargs: Any) -> int:
"Alias for ``send(objects, msg_type=TYPE_DELETE, ...)``."
return send(objects, msg_type=TYPE_DELETE, **kwargs)
[docs]
def send_refresh(objects: Sequence[Model], **kwargs: Any) -> int:
"Alias for ``send(objects, msg_type=TYPE_REFRESH, ...)``."
return send(objects, msg_type=TYPE_REFRESH, **kwargs)
[docs]
def full_refresh(
model_or_manager: type[Model] | Manager | None = None,
producer: KafkaProducer | None = None,
flush: bool = True,
) -> int:
"""
Does full refresh for model or manager. Sends refresh message for each
object, then sends enumerate message with objects IDs or EOS (end of
stream).
"""
def _refresh(
streamer: Streamer,
manager: Manager,
producer: KafkaProducer,
flush: bool,
timestamp: datetime | None = None,
) -> int:
if timestamp is None:
timestamp = timezone.now()
queryset = manager.all()
objects_ids = list(queryset.order_by().values_list("pk", flat=True))
count = streamer.send_objects(
queryset,
manager=manager,
objects_ids=objects_ids,
msg_type=TYPE_REFRESH,
timestamp=timestamp,
producer=producer,
flush=False,
)
if streamer.refresh_finalize_type == RefreshFinalizeType.ENUMERATE:
count += streamer.send_ids_enumerate(
objects_ids,
manager=manager,
timestamp=timestamp,
producer=producer,
flush=flush,
)
elif streamer.refresh_finalize_type == RefreshFinalizeType.EOS:
count += streamer.send_eos(
timestamp=timestamp, producer=producer, flush=flush
)
return count
streamer_manager_list: list[tuple[Streamer, Manager]] = []
if model_or_manager is None:
streamer_manager_list.extend(
[(streamer, model._default_manager) for model, streamer in get_registry()]
)
elif isinstance(model_or_manager, Manager):
manager = model_or_manager
model = manager.model
streamer = get_streamer(model)
if streamer is not None:
streamer_manager_list.append((streamer, manager))
else:
model = model_or_manager
manager = model._default_manager
streamer = get_streamer(model)
if streamer is not None:
streamer_manager_list.append((streamer, manager))
count = 0
for streamer, manager in streamer_manager_list:
if producer is None:
producer = streamer.get_producer()
count += _refresh(streamer, manager, producer, flush)
return count