Setup streamers

In order to django-kafka-streamer to know which modules to stream, you need to create a streamers.py file in the app directory and define a streamer class for each model that needs to be streamed. The minimum configuration is:

from kafkastreamer import Streamer, register
from .models import MyModel

@register(MyModel)
class MyModelStreamer(Streamer):
    topic = "my-topic"

For example your model definition is:

class MyModel(models.Model):
    field1 = models.IntegerField()
    field2 = models.CharField(max_length=10)

Then when a new instance of MyModel is created. The following data will be send to the my-topic Kafka topic:

{
    "_time": "2023-01-01T00:00:00Z",
    "_type": "create",
    "id": 1,
    "field1": 1,
    "field2": "abc"
}

You can use the exclude attribute to exclude some fields:

@register(MyModel)
class MyModelStreamer(Streamer):
    topic = "my-topic"
    exclude = ["field2"]

Then the output will be:

{
    "_time": "2023-01-01T00:00:00Z",
    "_type": "create",
    "id": 1,
    "field1": 1
}

The model can have related fields. For example:

class OtherModel(models.Model):
    other_field = models.IntegerField()

class MyModel(models.Model):
    field1 = models.IntegerField()
    field2 = models.CharField(max_length=10)
    other = models.ForeignKey(OtherModel)

To make a related object appear in the output, add it to include and select_related lists:

@register(MyModel)
class MyModelStreamer(Streamer):
    topic = "my-topic"
    include = ["other"]
    select_related = ["other"]

Then the output will be:

{
    "_time": "2023-01-01T00:00:00Z",
    "_type": "create",
    "id": 1,
    "field1": 1
    "field2": "abc"
    "other_id": 2,
    "other": {
        "id": 2,
        "other_field": 3,
    }
}

To re-stream the data when the related object of OtherModel is changed, add it to the handle_related list:

@register(MyModel)
class MyModelStreamer(Streamer):
    topic = "my-topic"
    include = ["other"]
    select_related = ["other"]
    handle_related = ["other"]

To add additional fields with constant values, use the static_fields dictionary:

@register(MyModel)
class MyModelStreamer(Streamer):
    topic = "my-topic"
    static_fields = {"model": "MyModel"}

To add additional field with calculated value, define a load_<field> method, and add the field to the include list:

@register(MyModel)
class MyModelStreamer(Streamer):
    topic = "my-topic"
    include = ["field1_plus_one"]

    def load_field1_plus_one(self, obj, batch):
        return obj.field1 + 1