I’m Denis, a Software Engineer living in Romania. I’m passionate about cloud computing and software development.
Salut 👋,
Acesta este un articol scurt despre setarea offset-urilor în Apache Kafka pentru un grup de consumatori.
În mod normal, pentru a reseta offset-urile în Kafka trebuie să folosești unealta kafka-consumer-groups.sh, ceea ce înseamnă descărcarea arhivei zip cu codul sursă al Kafka și configurarea SDK-ului Java. Toate uneltele Kafka depind de Java și acest lucru nu este foarte plăcut sau prietenos pentru dezvoltatori…
Uneori, chiar dacă instalezi Java corect și pornești uneltele, acestea nu funcționează 🤷🏻♂️. Fie versiunile uneltelor sunt incompatibile cu versiunea Kafka de pe server, fie comanda se execută cu succes, dar nu pare să facă nimic…
O altă metodă de a seta offset-urile pentru un consumator este să folosești o bibliotecă Kafka și să o faci prin cod.
Am Python instalat pe sistemul meu și tot ce trebuie să fac este să instalez biblioteca confluent-kafka:
1
pip install confluent-kafka
Și apoi să rulez următorul fragment de cod pentru a reseta offset-urile consumatorului la un anumit timestamp:
fromconfluent_kafkaimportConsumer,TopicPartitionimporttime# Configurationconsumer_config={'bootstrap.servers':'localhost:9092','group.id':'my-consumer-group','auto.offset.reset':'earliest','enable.partition.eof':True}topic='my-topic'timestamp_ms=int(time.mktime(time.strptime("2025-04-01 12:00:00","%Y-%m-%d %H:%M:%S"))*1000)# or time in miliseconds# Create consumerconsumer=Consumer(consumer_config)# Get metadata to discover partitionsmetadata=consumer.list_topics(topic)partitions=[TopicPartition(topic,p.id,timestamp_ms)forpinmetadata.topics[topic].partitions.values()]# Lookup offsets for the timestampoffsets=consumer.offsets_for_times(partitions,timeout=10.0)# Assign partitions with correct offsetsconsumer.assign(offsets)# Start consumingtry:whileTrue:msg=consumer.poll(timeout=1.0)ifmsgisNone:continueifmsg.error():print("Error:",msg.error())continueprint(f"{msg.topic()} [{msg.partition()}] at offset {msg.offset()}: {msg.value().decode('utf-8')}")breakexceptKeyboardInterrupt:passfinally:consumer.close()
Salutare tuturor 👋,
Am jucat cu FastAI și învățarea profundă de o săptămână sau cam așa ceva și am decis să combin pasiunea mea pentru software cu pasiunea mea pentru fotografie. Lucrez la o aplicație sau poate la o bibliotecă care vă permite să generați etichete pentru o fotografie. Pentru un fotograf amator ca mine este destul de util, deoarece nu am întotdeauna inspirație să scriu etichete pentru fotografiile mele și uneori etichetele pe care le scriu sunt inconsecvente.
Salutare tuturor 👋,
În această postare rapidă vă voi arăta cum să implementați un model FastAI folosind FastAPI.
Este o continuare a postării mele anterioare de aici și se bazează parțial pe această prelegere FastAI: