Sari la conținut
Background Image
  1. Posts/

Apache Kafka: Cum să setezi offset-urile la un timp fix

Denis Nutiu
Autor
Denis Nutiu
I’m Denis, a Software Engineer living in Romania. I’m passionate about cloud computing and software development.

blog thumbnail

Salut 👋,

Acesta este un articol scurt despre setarea offset-urilor în Apache Kafka pentru un grup de consumatori.

În mod normal, pentru a reseta offset-urile în Kafka trebuie să folosești unealta kafka-consumer-groups.sh, ceea ce înseamnă descărcarea arhivei zip cu codul sursă al Kafka și configurarea SDK-ului Java. Toate uneltele Kafka depind de Java și acest lucru nu este foarte plăcut sau prietenos pentru dezvoltatori…

Uneori, chiar dacă instalezi Java corect și pornești uneltele, acestea nu funcționează 🤷🏻‍♂️. Fie versiunile uneltelor sunt incompatibile cu versiunea Kafka de pe server, fie comanda se execută cu succes, dar nu pare să facă nimic…

O altă metodă de a seta offset-urile pentru un consumator este să folosești o bibliotecă Kafka și să o faci prin cod.

Am Python instalat pe sistemul meu și tot ce trebuie să fac este să instalez biblioteca confluent-kafka:

1
pip install confluent-kafka

Și apoi să rulez următorul fragment de cod pentru a reseta offset-urile consumatorului la un anumit timestamp:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from confluent_kafka import Consumer, TopicPartition
import time

# Configuration
consumer_config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-consumer-group',
    'auto.offset.reset': 'earliest',
    'enable.partition.eof': True
}

topic = 'my-topic'
timestamp_ms = int(time.mktime(time.strptime("2025-04-01 12:00:00", "%Y-%m-%d %H:%M:%S")) * 1000) # or time in miliseconds

# Create consumer
consumer = Consumer(consumer_config)

# Get metadata to discover partitions
metadata = consumer.list_topics(topic)
partitions = [TopicPartition(topic, p.id, timestamp_ms) for p in metadata.topics[topic].partitions.values()]

# Lookup offsets for the timestamp
offsets = consumer.offsets_for_times(partitions, timeout=10.0)

# Assign partitions with correct offsets
consumer.assign(offsets)

# Start consuming
try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            print("Error:", msg.error())
            continue

        print(f"{msg.topic()} [{msg.partition()}] at offset {msg.offset()}: {msg.value().decode('utf-8')}")
        break

except KeyboardInterrupt:
    pass
finally:
    consumer.close()

Asta e tot, mulțumesc pentru lectură!

Related

Modelul jetonului de anulare în Python
Salut! 👋 Articolul Modelul jetonului de anulare este un model inspirat de structura CancellationToken din C# și pachetul context din Golang.
Etichetarea imaginilor cu învățare profundă
Salutare tuturor 👋, Am jucat cu FastAI și învățarea profundă de o săptămână sau cam așa ceva și am decis să combin pasiunea mea pentru software cu pasiunea mea pentru fotografie. Lucrez la o aplicație sau poate la o bibliotecă care vă permite să generați etichete pentru o fotografie. Pentru un fotograf amator ca mine este destul de util, deoarece nu am întotdeauna inspirație să scriu etichete pentru fotografiile mele și uneori etichetele pe care le scriu sunt inconsecvente.
Implementați un model FastAI cu FastAPI
Salutare tuturor 👋, În această postare rapidă vă voi arăta cum să implementați un model FastAI folosind FastAPI. Este o continuare a postării mele anterioare de aici și se bazează parțial pe această prelegere FastAI: