Kafka¶
Since v0.24.0
Introduction¶
The Testcontainers module for Kafka.
This module runs Kafka in Kraft mode: Apache Kafka Without ZooKeeper.
Adding this module to your project dependencies¶
Please run the following command to add the Kafka module to your Go dependencies:
go get github.com/testcontainers/testcontainers-go/modules/kafka
Apache Kafka images¶
- Not available until the next release main
Images apache/kafka, apache/kafka-native (Apache Kafka) are supported by this module in addition to confluentinc/confluent-local (Confluent).
The native container (apache/kafka-native) is based on GraalVM and typically starts several seconds faster than alternatives.
It is recommended to prefer Apache Kafka images over Confluent images, as Confluent has unresolved issue with graceful shutdown.
Apache Kafka Native images are also smallest, however they do not include CLI tools such as kafka-topics.sh.
| Docker Image | Size | Start/stop time | CLI Tools | Graceful Shutdown |
|---|---|---|---|---|
| Apache Kafka Native | 137MB (4.0.1 linux amd) | <1 second | No | OK |
| Apache Kafka | 393MB (4.0.1 linux amd) | ~3-4 seconds | Yes | OK |
| Confluent Kafka | 649MB (7.5.0 linux amd) | ~13-14 seconds | Yes | issue |
Info
If you use image from custom registry, you might need to override starter script, see Starter script section below.
Usage example¶
ctx := context.Background()
kafkaContainer, err := kafka.Run(ctx,
"apache/kafka-native:4.0.1",
kafka.WithClusterID("test-cluster"),
)
defer func() {
if err := testcontainers.TerminateContainer(kafkaContainer); err != nil {
log.Printf("failed to terminate container: %s", err)
}
}()
if err != nil {
log.Printf("failed to start container: %s", err)
return
}
ctx := context.Background()
kafkaContainer, err := kafka.Run(ctx,
"apache/kafka:4.0.1",
kafka.WithClusterID("test-cluster"),
)
defer func() {
if err := testcontainers.TerminateContainer(kafkaContainer); err != nil {
log.Printf("failed to terminate container: %s", err)
}
}()
if err != nil {
log.Printf("failed to start container: %s", err)
return
}
ctx := context.Background()
kafkaContainer, err := kafka.Run(ctx,
"confluentinc/confluent-local:7.5.0",
kafka.WithClusterID("test-cluster"),
)
defer func() {
if err := testcontainers.TerminateContainer(kafkaContainer); err != nil {
log.Printf("failed to terminate container: %s", err)
}
}()
if err != nil {
log.Printf("failed to start container: %s", err)
return
}
Module Reference¶
Run function¶
- Since v0.32.0
Info
The RunContainer(ctx, opts...) function is deprecated and will be removed in the next major release of Testcontainers for Go.
The Kafka module exposes one entrypoint function to create the Kafka container, and this function receives three parameters:
func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustomizer) (*KafkaContainer, error)
context.Context, the Go context.string, the Docker image to use.testcontainers.ContainerCustomizer, a variadic argument for passing options.
Image¶
Use the second argument in the Run function to set a valid Docker image.
In example: Run(context.Background(), "apache/kafka-native:4.0.1").
Warning
Module expects that the image in use supports Kraft mode (Kafka without ZooKeeper).
The minimal required version of Confluent images for KRaft mode is confluentinc/confluent-local:7.4.0.
All Apache images support Kraft mode.
Environment variables¶
The environment variables that are already set by default are:
"KAFKA_LISTENERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094,LOCALHOST://localhost:9095",
"KAFKA_REST_BOOTSTRAP_SERVERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094,LOCALHOST://localhost:9095",
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,LOCALHOST:PLAINTEXT",
"KAFKA_INTER_BROKER_LISTENER_NAME": "BROKER",
"KAFKA_BROKER_ID": "1",
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1",
"KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS": "1",
"KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR": "1",
"KAFKA_TRANSACTION_STATE_LOG_MIN_ISR": "1",
"KAFKA_LOG_FLUSH_INTERVAL_MESSAGES": strconv.Itoa(math.MaxInt),
"KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS": "0",
"KAFKA_NODE_ID": "1",
"KAFKA_PROCESS_ROLES": "broker,controller",
"KAFKA_CONTROLLER_LISTENER_NAMES": "CONTROLLER",
Starter script¶
The Kafka container will be started using a custom shell script.
Module would vary the starter script depending on the image in use, using following logic:
- image starts with
apache/kafkaordocker.io/apache/kafka: use Apache Kafka starter script. - image starts with
confluentinc/ordocker.io/confluentinc/: use Confluent starter script. - otherwise: use Confluent starter script (for backward compatibility).
See also WithApacheFlavor/WithConfluentFlavor and WithStarterScript options to override this behavior.
apacheStarterScript = `#!/bin/bash
export KAFKA_ADVERTISED_LISTENERS=%s,BROKER://%s:9092,LOCALHOST://localhost:9095
echo Starting Apache Kafka
exec /etc/kafka/docker/run`
confluentStarterScript = `#!/bin/bash
source /etc/confluent/docker/bash-config
export KAFKA_ADVERTISED_LISTENERS=%s,BROKER://%s:9092,LOCALHOST://localhost:9095
echo Starting Kafka KRaft mode
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
echo 'kafka-storage format --ignore-formatted -t "$(kafka-storage random-uuid)" -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure
echo '' > /etc/confluent/docker/ensure
/etc/confluent/docker/configure
/etc/confluent/docker/launch`
ctx := context.Background()
kafkaContainer, err := kafka.Run(ctx,
// the image might be different, for example
// custom-registry/apache/kafka-native:4.0.1,
// in which case the starter script would not
// be correctly inferred, and should be overridden
"apache/kafka-native:4.0.1",
kafka.WithClusterID("test-cluster"),
// this explicitly sets the starter script to use
// the one compatible with Apache images
kafka.WithApacheFlavor(),
)
Container Options¶
When starting the Kafka container, you can pass options in a variadic way to configure it.
WithApacheFlavor/WithConfluentFlavor¶
- Not available until the next release main
You can manually specify which flavor of starter script to use with the following options:
ctx := context.Background()
kafkaContainer, err := kafka.Run(ctx,
// the image might be different, for example
// custom-registry/apache/kafka-native:4.0.1,
// in which case the starter script would not
// be correctly inferred, and should be overridden
"apache/kafka-native:4.0.1",
kafka.WithClusterID("test-cluster"),
// this explicitly sets the starter script to use
// the one compatible with Apache images
kafka.WithApacheFlavor(),
)
ctx := context.Background()
kafkaContainer, err := kafka.Run(ctx,
// the image might be different, for example
// custom-registry/confluentinc/confluent-local:7.5.0,
// in which case the starter script might not
// be correctly inferred, and should be overridden
"confluentinc/confluent-local:7.5.0",
kafka.WithClusterID("test-cluster"),
// this explicitly sets the starter script to use
// the one compatible with Confluent images
kafka.WithConfluentFlavor(),
)
Note that both WithApacheFlavor and WithConfluentFlavor conflict with each other and with WithStarterScript option. An error will be returned if several of those options are provided.
WithStarterScript¶
- Not available until the next release main
This allows to provide a completely custom starter script for the Kafka container. Be careful when using this option, as compatibility with any image and module version cannot be guaranteed.
Note that WithStarterScript conflicts with WithApacheFlavor and WithConfluentFlavor options. An error will be returned if several of those options are provided.
The following options are exposed by the testcontainers package.
Basic Options¶
WithExposedPortsSince v0.37.0WithEnvSince v0.29.0WithWaitStrategySince v0.20.0WithAdditionalWaitStrategySince v0.38.0WithWaitStrategyAndDeadlineSince v0.20.0WithAdditionalWaitStrategyAndDeadlineSince v0.38.0WithEntrypointSince v0.37.0WithEntrypointArgsSince v0.37.0WithCmdSince v0.37.0WithCmdArgsSince v0.37.0WithLabelsSince v0.37.0
Lifecycle Options¶
WithLifecycleHooksSince v0.38.0WithAdditionalLifecycleHooksSince v0.38.0WithStartupCommandSince v0.25.0WithAfterReadyCommandSince v0.28.0
Files & Mounts Options¶
WithFilesSince v0.37.0WithMountsSince v0.37.0WithTmpfsSince v0.37.0WithImageMountSince v0.37.0
Build Options¶
WithDockerfileSince v0.37.0
Logging Options¶
WithLogConsumersSince v0.28.0WithLogConsumerConfigSince v0.38.0WithLoggerSince v0.29.0
Image Options¶
WithAlwaysPullSince v0.38.0WithImageSubstitutorsSince v0.26.0WithImagePlatformSince v0.38.0
Networking Options¶
WithNetworkSince v0.27.0WithNetworkByNameSince v0.38.0WithBridgeNetworkSince v0.38.0WithNewNetworkSince v0.27.0
Advanced Options¶
WithHostPortAccessSince v0.31.0WithConfigModifierSince v0.20.0WithHostConfigModifierSince v0.20.0WithEndpointSettingsModifierSince v0.20.0CustomizeRequestSince v0.20.0WithNameSince v0.38.0WithNoStartSince v0.38.0WithProviderSince v0.39.0
Experimental Options¶
WithReuseByNameSince v0.37.0
Container Methods¶
The Kafka container exposes the following methods:
Brokers¶
- Since v0.24.0
The Brokers(ctx) method returns the Kafka brokers as a string slice, containing the host and the random port defined by Kafka's public port (9093/tcp).
brokers, err := kafkaContainer.Brokers(ctx)
Localhost listener¶
- Not available until the next release main
Kafka container would by default be configured with localhost:9095 as one of advertised listeners. This can be used when you need to run CLI commands inside the container, for example with custom wait strategies or to prepare test data.
Here is an example that uses custom wait strategy that checks if listing topics works:
kafkaContainer, err := kafka.Run(ctx, "apache/kafka:4.0.1",
testcontainers.WithWaitStrategy(
wait.NewExecStrategy([]string{
"/opt/kafka/bin/kafka-topics.sh",
"--bootstrap-server",
"localhost:9095",
"--list",
}).
WithExitCode(0).
WithPollInterval(2*time.Second).
WithStartupTimeout(120*time.Second),
),
kafka.WithClusterID("test-cluster"),
)
Note: this will not work with apache/kafka-native images, as they do not include CLI tools.