Apache Flink Checkpoints on S3 and S3 compatible storage

Hello,

Recently someone working at Yahoo emailed me regarding an old thread I’ve started on the Apache Flink user mailing list. I’ve replied to the e-mail but also decided to turn the reply into a blog post, because it might help other people as well.

Email

Hi,

I was able to get it working after tinkering with it. The issue was mainly a miscommunication, we didn’t formally know which authentication method we were using in AWS. We we’re using only s3://

Here are our configuration options:

On S3 compatible storage:

fs.s3a.access.key: ""
fs.s3a.secret.key: ""
fs.s3a.connection.ssl.enabled: "false"
fs.s3a.endpoint: "ceph-mcr-1.xxx.xxx.xxx:xxx"
fs.s3a.list.version: "1"
s3.path.style.access: "true"
containerized.master.env.ENABLE_BUILT_IN_PLUGINS: "flink-s3-fs-hadoop-1.13.2.jar"
containerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS: "flink-s3-fs-hadoop-1.13.2.jar"
state.backend: "rocksdb"
state.backend.incremental: "true"
state.checkpoints.dir: "s3://bucket-name/checkpoints/$cluster_name$/"
state.savepoints.dir: "s3://bucket-name/savepoints/$cluster_name$/"

On S3 with AWS:

fs.s3a.aws.credentials.provider: "com.amazonaws.auth.WebIdentityTokenCredentialsProvider",
containerized.master.env.ENABLE_BUILT_IN_PLUGINS: "flink-s3-fs-hadoop-1.13.2.jar",
containerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS: "flink-s3-fs-hadoop-1.13.2.jar",
state.backend: "rocksdb",
state.backend.incremental: "true",
state.checkpoints.dir: "s3://xxx/checkpoints/$cluster_name$/",
state.savepoints.dir: "s3://xxx/savepoints/$cluster_name$/"

fs.s3a.aws.credentials.provider was the authentication method (credentials provider) that we were missing, it’s not found in the Hadoop plugin docs[2] but it’s found in AWSJavaSDK docs[3][4]. AWS mounts secrets inside Flink pods so using this provider should make it work without further configuration.

Note that flink-s3-fs-hadoop-1.13.2.jar needs to be adapted to your Flink version. $cluster_name should also be substituted with your cluster/deployment name.

That’s pretty much it, I’m also attaching the Flink S3 docs[1] to the email. Thanks for reaching out! Hope you’ll figure it out!

Best,

Denis Nutiu

docs/deployment/filesystems/s3/[1]
hadoop-aws/tools/hadoop-aws/index.html#S3A[2]
amazonaws/auth/AWSCredentialsProvider.html[3]
amazonaws/auth/WebIdentityTokenCredentialsProvider.html[4]

As a side note, if you’re using the Flink Operator to deploy your Flink job you can set environment variables in the pod template file instead of flink-config.yaml.

Thanks for reading!

Using confluent-kafka-go on MacOS M1

Hello,

TLDR;

brew install librdkafka openssl@3 pkg-config
export PKG_CONFIG_PATH="/opt/homebrew/opt/openssl@3/lib/pkgconfig"
go test -tags dynamic ./...

I’ve been transition from a Linux machine to a MacOS M1 machine at work and when I ran tests for a Golang project, I noticed that the test failed on modules depending on librdkafka.

Initially I’ve had problems with Kafka on MacOS M1 on Docker, since I was using an older image version that didn’t have any arm64 build, updating to images to version 7.2.1[1] fixed the issues.

This time however the problems was with my Golang dependencies and not the Docker containers. Running the tests resulted in:

go test ./...
[...]
ld: warning: ignoring file /Users/dnutiu/go/pkg/mod/github.com/confluentinc/confluent-kafka-go@v1.8.2/kafka/librdkafka_vendor/librdkafka_darwin.a, building for macOS-arm64 but attempting to link with file built for macOS-x86_64
Undefined symbols for architecture arm64:

Undefined symbols for architecture arm64” My guess is that the published confluent-kafka-go package does not contain (yet) symbols for arm64, to fix the issues you can use the module with another librdkafka.

When installing librdkafka formula from Homebrew the library is built for arm64 architecture. To install run:

brew install librdkafka openssl@3 pkg-config

Next, we’ll use a tool called pkg-config to tell librdkafka where to find the other libraries, since it depends on openssl we need to export PKG_CONFIG_PATH. To grab the value run:

brew info openssl
==> openssl@3: stable 3.0.5 (bottled) [keg-only]
Cryptography and SSL/TLS Toolkit
https://openssl.org/
/opt/homebrew/Cellar/openssl@3/3.0.5 (6,444 files, 27.9MB)
  Poured from bottle on 2022-08-31 at 14:10:49
From: https://github.com/Homebrew/homebrew-core/blob/HEAD/Formula/openssl@3.rb
License: Apache-2.0
==> Dependencies
Required: ca-certificates ✘
==> Caveats
A CA file has been bootstrapped using certificates from the system
keychain. To add additional certificates, place .pem files in
  /opt/homebrew/etc/openssl@3/certs

and run
  /opt/homebrew/opt/openssl@3/bin/c_rehash

openssl@3 is keg-only, which means it was not symlinked into /opt/homebrew,
because macOS provides LibreSSL.

If you need to have openssl@3 first in your PATH, run:
  echo 'export PATH="/opt/homebrew/opt/openssl@3/bin:$PATH"' >> ~/.zshrc

For compilers to find openssl@3 you may need to set:
  export LDFLAGS="-L/opt/homebrew/opt/openssl@3/lib"
  export CPPFLAGS="-I/opt/homebrew/opt/openssl@3/include"

For pkg-config to find openssl@3 you may need to set:
  export PKG_CONFIG_PATH="/opt/homebrew/opt/openssl@3/lib/pkgconfig"

Check that everything works by running:

pkg-config --libs --cflags rdkafka

-I/opt/homebrew/Cellar/openssl@3/3.0.5/include -I/opt/homebrew/Cellar/librdkafka/1.9.2/include -I/opt/homebrew/Cellar/zstd/1.5.2/include -I/opt/homebrew/Cellar/lz4/1.9.4/include -L/opt/homebrew/Cellar/librdkafka/1.9.2/lib -lrdkafka

Now, all you need to do is run your tests with the -tags dynamic flag, it will instruct confluent-kafka-go to use the librdkafka library that we’ve built from source.

Thanks for reading and happy hacking!🫶

How to identify similar images using hashing

Hi 👋,

In this article I would like to talk about image hashing.

Image hashing algorithms are specialized hashing functions that output the hash of an image based on the image’s properties. Duplicate images output the same hash value and visually identical images output a hash value that is slightly different.

To simplify

hash("white_cat") = "aaaa"
hash("brown_cat") = "aaba"
hash("car") = "xkjwe"

Some use cases for image hashing are:

  • Duplicate Image Detection
  • Anti-Impersonation / Image Stealing
  • Image filtering
  • Reverse image search

Let’s play around with image hashing techniques using Python and the ImageHash library. Install the library with:

pip install imagehash
pip install six

To obtain some sample images I’ve used Pexels and searched for words like “white cat”, “firetruck”.

Here’s the images that I’m using: cat1, cat2, cat3 and firetruck1.

I’m going to import the necessary stuff and add a function that converts the hexadecimal string given by image hash to an integer.

from PIL import Image
import imagehash


def hash_to_int(img_hash: imagehash.ImageHash):
    return int(str(img_hash), 16)

The reason for the hash_to_int function is that is much easier to do computations using integers rather than strings, in the future if we’re going to build a service that makes use of the image hashing and computes hamming distances, we can store the int hashes in an OLAP database such as ClickHouse and use bitHammingDistance to compute the Hamming Distance.

The next snippet of code opens the images, computes the average and color hashes and for every image in the dataset it computes the hamming distance between the average hash summed with the hamming distance of the color hash.

The lower the hamming distance the more similar the images. A hamming distane of 0 means the images are equal.

def main():
    images = [
        Image.open("cat1.jpg"),
        Image.open("cat2.jpg"),
        Image.open("cat3.jpg"),
        Image.open("firetruck1.jpg")
    ]

    average_hashes = [hash_to_int(imagehash.average_hash(image)) for image in images]
    color_hashes = [hash_to_int(imagehash.colorhash(image)) for image in images]

    image_hashes = list(zip(images, average_hashes, color_hashes))

    source = image_hashes[0]

    for image in image_hashes:
        hamming_average_hash = bin(source[1] ^ image[1]).count("1")
        hamming_color_hash = bin(source[2] ^ image[2]).count("1")
        hamming_distance = hamming_average_hash + hamming_color_hash
        print("Hamming Distance between", source[0].filename, "and", image[0].filename, "is", hamming_distance)


if __name__ == '__main__':
    main()

To compute the hamming distance, you’ll need to XOR the two integers and then count the number of 1 bits bin(source[1] ^ image[1]).count("1"). That’s it.

If the run the program with the source variable set to cat1.jpg, source = image_hashes[0], we get the following result:

Hamming Distance between cat1.jpg and cat1.jpg is 0
Hamming Distance between cat1.jpg and cat2.jpg is 36
Hamming Distance between cat1.jpg and cat3.jpg is 39
Hamming Distance between cat1.jpg and firetruck1.jpg is 33

If we look at our dataset the first image cat1 is somewhat visually similar to the image of the firetruck.

If we run the program with the source variable set to cat2.jpg we can see that cat2 is similar to cat3 since both images contain white cats.

Hamming Distance between cat2.jpg and cat1.jpg is 36
Hamming Distance between cat2.jpg and cat2.jpg is 0
Hamming Distance between cat2.jpg and cat3.jpg is 23
Hamming Distance between cat2.jpg and firetruck1.jpg is 47

Conclusion

We used a Python image hashing library to compute the average and color hash of some images and then we determined which images are similar to each other by computing the hamming distance of the hashes.

Thanks for reading and build something fun! 🔨

References

Full Code

"""
pip install imagehash
pip install six
"""
from PIL import Image
import imagehash


def hash_to_int(img_hash: imagehash.ImageHash):
    return int(str(img_hash), 16)


def main():
    images = [
        Image.open("cat1.jpg"),
        Image.open("cat2.jpg"),
        Image.open("cat3.jpg"),
        Image.open("firetruck1.jpg")
    ]

    average_hashes = [hash_to_int(imagehash.average_hash(image)) for image in images]
    color_hashes = [hash_to_int(imagehash.colorhash(image)) for image in images]

    image_hashes = list(zip(images, average_hashes, color_hashes))

    source = image_hashes[0]

    for image in image_hashes:
        hamming_average_hash = bin(source[1] ^ image[1]).count("1")
        hamming_color_hash = bin(source[2] ^ image[2]).count("1")
        hamming_distance = hamming_average_hash + hamming_color_hash
        print("Hamming Distance between", source[0].filename, "and", image[0].filename, "is", hamming_distance)


if __name__ == '__main__':
    main()

DeMorgan’s Law

DeMorgan’s law is a simple law that I learned at UPT during one of my hardware classes. While it is useful in hardware it, it is also useful when writing programs.

If you have a condition like not (A and B), you can rewrite it to !A or !B.

if __name__ == '__main__':
    a = True
    b = True

    if not (a and b):
        print("True")
    else:
        print("False")

    if not a or not b:
        print("True")
    else:
        print("False")