public class StreamsConfig extends AbstractConfig
KafkaStreams instance.
Can also be use to configure the Kafka Streams internal KafkaConsumer and KafkaProducer.
To avoid consumer/producer property conflicts, you should prefix those properties using
consumerPrefix(String) and producerPrefix(String), respectively.
Example:
// potentially wrong: sets "metadata.max.age.ms" to 1 minute for producer AND consumer
Properties streamsProperties = new Properties();
streamsProperties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 60000);
// or
streamsProperties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 60000);
// suggested:
Properties streamsProperties = new Properties();
// sets "metadata.max.age.ms" to 1 minute for consumer only
streamsProperties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), 60000);
// sets "metadata.max.age.ms" to 1 minute for producer only
streamsProperties.put(StreamsConfig.producerPrefix(ProducerConfig.METADATA_MAX_AGE_CONFIG), 60000);
StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
Kafka Streams required to set at least properties "application.id" and
"bootstrap.servers".
Furthermore, it is not allowed to enable "enable.auto.commit" that
is disabled by Kafka Streams by default.| Modifier and Type | Class and Description |
|---|---|
static class |
StreamsConfig.InternalConfig |
| Modifier and Type | Field and Description |
|---|---|
static String |
APPLICATION_ID_CONFIG
application.id |
static String |
APPLICATION_SERVER_CONFIG
user.endpoint |
static String |
AT_LEAST_ONCE
Config value for parameter
"processing.guarantee" for at-least-once processing guarantees. |
static String |
BOOTSTRAP_SERVERS_CONFIG
bootstrap.servers |
static String |
BUFFERED_RECORDS_PER_PARTITION_CONFIG
buffered.records.per.partition |
static String |
CACHE_MAX_BYTES_BUFFERING_CONFIG
cache.max.bytes.buffering |
static String |
CLIENT_ID_CONFIG
client.id |
static String |
COMMIT_INTERVAL_MS_CONFIG
commit.interval.ms |
static String |
CONNECTIONS_MAX_IDLE_MS_CONFIG
connections.max.idle.ms |
static String |
CONSUMER_PREFIX
|
static String |
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
default.deserialization.exception.handler |
static String |
DEFAULT_KEY_SERDE_CLASS_CONFIG
default key.serde |
static String |
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG
default timestamp.extractor |
static String |
DEFAULT_VALUE_SERDE_CLASS_CONFIG
default value.serde |
static String |
EXACTLY_ONCE
Config value for parameter
"processing.guarantee" for exactly-once processing guarantees. |
static String |
KEY_SERDE_CLASS_CONFIG
Deprecated.
Use
DEFAULT_KEY_SERDE_CLASS_CONFIG instead. |
static String |
METADATA_MAX_AGE_CONFIG
metadata.max.age.ms |
static String |
METRIC_REPORTER_CLASSES_CONFIG
metric.reporters |
static String |
METRICS_NUM_SAMPLES_CONFIG
metrics.num.samples |
static String |
METRICS_RECORDING_LEVEL_CONFIG
metrics.record.level |
static String |
METRICS_SAMPLE_WINDOW_MS_CONFIG
metrics.sample.window.ms |
static String |
NUM_STANDBY_REPLICAS_CONFIG
num.standby.replicas |
static String |
NUM_STREAM_THREADS_CONFIG
num.stream.threads |
static String |
PARTITION_GROUPER_CLASS_CONFIG
partition.grouper |
static String |
POLL_MS_CONFIG
poll.ms |
static String |
PROCESSING_GUARANTEE_CONFIG
cache.max.bytes.buffering |
static String |
PRODUCER_PREFIX
|
static String |
RECEIVE_BUFFER_CONFIG
receive.buffer.bytes |
static String |
RECONNECT_BACKOFF_MAX_MS_CONFIG
reconnect.backoff.max |
static String |
RECONNECT_BACKOFF_MS_CONFIG
reconnect.backoff.ms |
static String |
REPLICATION_FACTOR_CONFIG
replication.factor |
static String |
REQUEST_TIMEOUT_MS_CONFIG
request.timeout.ms |
static String |
RETRY_BACKOFF_MS_CONFIG
retry.backoff.ms |
static String |
ROCKSDB_CONFIG_SETTER_CLASS_CONFIG
rocksdb.config.setter |
static String |
SECURITY_PROTOCOL_CONFIG
security.protocol |
static String |
SEND_BUFFER_CONFIG
send.buffer.bytes |
static String |
STATE_CLEANUP_DELAY_MS_CONFIG
state.cleanup.delay |
static String |
STATE_DIR_CONFIG
state.dir |
static String |
TIMESTAMP_EXTRACTOR_CLASS_CONFIG
Deprecated.
Use
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG instead. |
static String |
TOPIC_PREFIX
Prefix used to provide default topic configs to be applied when creating internal topics.
|
static String |
VALUE_SERDE_CLASS_CONFIG
Deprecated.
Use
DEFAULT_VALUE_SERDE_CLASS_CONFIG instead. |
static String |
WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG
windowstore.changelog.additional.retention.ms |
static String |
ZOOKEEPER_CONNECT_CONFIG
Deprecated.
Kakfa Streams does not use Zookeeper anymore and this parameter will be ignored.
|
| Constructor and Description |
|---|
StreamsConfig(Map<?,?> props)
Create a new
StreamsConfig using the given properties. |
| Modifier and Type | Method and Description |
|---|---|
static ConfigDef |
configDef()
Return a copy of the config definition.
|
static String |
consumerPrefix(String consumerProp)
Prefix a property with
CONSUMER_PREFIX. |
DeserializationExceptionHandler |
defaultDeserializationExceptionHandler() |
Serde |
defaultKeySerde()
Return an
configured instance of key Serde
class. |
TimestampExtractor |
defaultTimestampExtractor() |
Serde |
defaultValueSerde()
Return an
configured instance of value
Serde class. |
Map<String,Object> |
getConsumerConfigs(StreamThread streamThread,
String groupId,
String clientId)
Get the configs to the
consumer. |
Map<String,Object> |
getProducerConfigs(String clientId)
Get the configs for the
producer. |
Map<String,Object> |
getRestoreConsumerConfigs(String clientId)
Get the configs for the
restore-consumer. |
Serde |
keySerde()
Deprecated.
|
static void |
main(String[] args) |
protected Map<String,Object> |
postProcessParsedConfig(Map<String,Object> parsedValues)
Called directly after user configs got parsed (and thus default values got set).
|
static String |
producerPrefix(String producerProp)
Prefix a property with
PRODUCER_PREFIX. |
static String |
topicPrefix(String topicProp)
Prefix a property with
TOPIC_PREFIX
used to provide default topic configs to be applied when creating internal topics. |
Serde |
valueSerde()
Deprecated.
|
equals, get, getBoolean, getClass, getConfiguredInstance, getConfiguredInstances, getConfiguredInstances, getDouble, getInt, getList, getLong, getPassword, getShort, getString, hashCode, ignore, logUnused, originals, originalsStrings, originalsWithPrefix, typeOf, unused, values, valuesWithPrefixOverridepublic static final String CONSUMER_PREFIX
consumer configs from producer configs.
It is recommended to use consumerPrefix(String) to add this prefix to consumer
properties.public static final String TOPIC_PREFIX
TopicConfig.
It is recommended to use topicPrefix(String).public static final String PRODUCER_PREFIX
producer configs from consumer configs.
It is recommended to use producerPrefix(String) to add this prefix to producer
properties.public static final String AT_LEAST_ONCE
"processing.guarantee" for at-least-once processing guarantees.public static final String EXACTLY_ONCE
"processing.guarantee" for exactly-once processing guarantees.public static final String APPLICATION_ID_CONFIG
application.idpublic static final String APPLICATION_SERVER_CONFIG
user.endpointpublic static final String BOOTSTRAP_SERVERS_CONFIG
bootstrap.serverspublic static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG
buffered.records.per.partitionpublic static final String CACHE_MAX_BYTES_BUFFERING_CONFIG
cache.max.bytes.bufferingpublic static final String CLIENT_ID_CONFIG
client.idpublic static final String COMMIT_INTERVAL_MS_CONFIG
commit.interval.mspublic static final String CONNECTIONS_MAX_IDLE_MS_CONFIG
connections.max.idle.mspublic static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
default.deserialization.exception.handlerpublic static final String DEFAULT_KEY_SERDE_CLASS_CONFIG
default key.serdepublic static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG
default timestamp.extractorpublic static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG
default value.serde@Deprecated public static final String KEY_SERDE_CLASS_CONFIG
DEFAULT_KEY_SERDE_CLASS_CONFIG instead.key.serdepublic static final String METADATA_MAX_AGE_CONFIG
metadata.max.age.mspublic static final String METRICS_NUM_SAMPLES_CONFIG
metrics.num.samplespublic static final String METRICS_RECORDING_LEVEL_CONFIG
metrics.record.levelpublic static final String METRIC_REPORTER_CLASSES_CONFIG
metric.reporterspublic static final String METRICS_SAMPLE_WINDOW_MS_CONFIG
metrics.sample.window.mspublic static final String NUM_STANDBY_REPLICAS_CONFIG
num.standby.replicaspublic static final String NUM_STREAM_THREADS_CONFIG
num.stream.threadspublic static final String PARTITION_GROUPER_CLASS_CONFIG
partition.grouperpublic static final String POLL_MS_CONFIG
poll.mspublic static final String PROCESSING_GUARANTEE_CONFIG
cache.max.bytes.bufferingpublic static final String RECEIVE_BUFFER_CONFIG
receive.buffer.bytespublic static final String RECONNECT_BACKOFF_MS_CONFIG
reconnect.backoff.mspublic static final String RECONNECT_BACKOFF_MAX_MS_CONFIG
reconnect.backoff.maxpublic static final String REPLICATION_FACTOR_CONFIG
replication.factorpublic static final String REQUEST_TIMEOUT_MS_CONFIG
request.timeout.mspublic static final String RETRY_BACKOFF_MS_CONFIG
retry.backoff.mspublic static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG
rocksdb.config.setterpublic static final String SECURITY_PROTOCOL_CONFIG
security.protocolpublic static final String SEND_BUFFER_CONFIG
send.buffer.bytespublic static final String STATE_CLEANUP_DELAY_MS_CONFIG
state.cleanup.delaypublic static final String STATE_DIR_CONFIG
state.dir@Deprecated public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG instead.timestamp.extractor@Deprecated public static final String VALUE_SERDE_CLASS_CONFIG
DEFAULT_VALUE_SERDE_CLASS_CONFIG instead.value.serdepublic static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG
windowstore.changelog.additional.retention.ms@Deprecated public static final String ZOOKEEPER_CONNECT_CONFIG
zookeeper.connectpublic StreamsConfig(Map<?,?> props)
StreamsConfig using the given properties.props - properties that specify Kafka Streams and internal consumer/producer configurationpublic static String consumerPrefix(String consumerProp)
CONSUMER_PREFIX. This is used to isolate consumer configs
from producer configs.consumerProp - the consumer property to be maskedCONSUMER_PREFIX + consumerProppublic static String producerPrefix(String producerProp)
PRODUCER_PREFIX. This is used to isolate producer configs
from consumer configs.producerProp - the producer property to be maskedproducerProppublic static String topicPrefix(String topicProp)
TOPIC_PREFIX
used to provide default topic configs to be applied when creating internal topics.topicProp - the topic property to be maskedtopicProppublic static ConfigDef configDef()
protected Map<String,Object> postProcessParsedConfig(Map<String,Object> parsedValues)
AbstractConfigpostProcessParsedConfig in class AbstractConfigparsedValues - unmodifiable map of current configurationpublic Map<String,Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) throws ConfigException
consumer.
Properties using the prefix CONSUMER_PREFIX will be used in favor over their non-prefixed versions
except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed
version as we only support reading/writing from/to the same Kafka Cluster.streamThread - the StreamThread creating a consumergroupId - consumer groupIdclientId - clientIdConfigException - if "enable.auto.commit" was set to false by the userpublic Map<String,Object> getRestoreConsumerConfigs(String clientId) throws ConfigException
restore-consumer.
Properties using the prefix CONSUMER_PREFIX will be used in favor over their non-prefixed versions
except in the case of ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed
version as we only support reading/writing from/to the same Kafka Cluster.clientId - clientIdConfigException - if "enable.auto.commit" was set to false by the userpublic Map<String,Object> getProducerConfigs(String clientId)
producer.
Properties using the prefix PRODUCER_PREFIX will be used in favor over their non-prefixed versions
except in the case of ProducerConfig.BOOTSTRAP_SERVERS_CONFIG where we always use the non-prefixed
version as we only support reading/writing from/to the same Kafka Cluster.clientId - clientId@Deprecated public Serde keySerde()
configured instance of key Serde
class. This method is deprecated. Use defaultKeySerde() method instead.public Serde defaultKeySerde()
configured instance of key Serde
class.@Deprecated public Serde valueSerde()
configured instance of value
Serde class. This method is deprecated. Use defaultValueSerde() instead.public Serde defaultValueSerde()
configured instance of value
Serde class.public TimestampExtractor defaultTimestampExtractor()
public DeserializationExceptionHandler defaultDeserializationExceptionHandler()
public static void main(String[] args)