Recently someone working at Yahoo emailed me regarding an old thread I've started on the Apache Flink user mailing list. I've replied to the e-mail but also decided to turn the reply into a blog post, because it might help other people as well.
I was able to get it working after tinkering with it. The issue was mainly a miscommunication, we didn't formally know which authentication method we were using in AWS. We we're using only s3://
Here are our configuration options:
On S3 compatible storage:
fs.s3a.access.key: "" fs.s3a.secret.key: "" fs.s3a.connection.ssl.enabled: "false" fs.s3a.endpoint: "ceph-mcr-1.xxx.xxx.xxx:xxx" fs.s3a.list.version: "1" s3.path.style.access: "true" containerized.master.env.ENABLE_BUILT_IN_PLUGINS: "flink-s3-fs-hadoop-1.13.2.jar" containerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS: "flink-s3-fs-hadoop-1.13.2.jar" state.backend: "rocksdb" state.backend.incremental: "true" state.checkpoints.dir: "s3://bucket-name/checkpoints/$cluster_name$/" state.savepoints.dir: "s3://bucket-name/savepoints/$cluster_name$/"
On S3 with AWS:
fs.s3a.aws.credentials.provider: "com.amazonaws.auth.WebIdentityTokenCredentialsProvider", containerized.master.env.ENABLE_BUILT_IN_PLUGINS: "flink-s3-fs-hadoop-1.13.2.jar", containerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS: "flink-s3-fs-hadoop-1.13.2.jar", state.backend: "rocksdb", state.backend.incremental: "true", state.checkpoints.dir: "s3://xxx/checkpoints/$cluster_name$/", state.savepoints.dir: "s3://xxx/savepoints/$cluster_name$/"
fs.s3a.aws.credentials.provider was the authentication method (credentials provider) that we were missing, it's not found in the Hadoop plugin docs but it's found in AWSJavaSDK docs. AWS mounts secrets inside Flink pods so using this provider should make it work without further configuration.
Note that flink-s3-fs-hadoop-1.13.2.jar needs to be adapted to your Flink version. $cluster_name should also be substituted with your cluster/deployment name.
That's pretty much it, I'm also attaching the Flink S3 docs to the email. Thanks for reaching out! Hope you'll figure it out!
As a side note, if you're using the Flink Operator to deploy your Flink job you can set environment variables in the pod template file instead of flink-config.yaml.
Thanks for reading!