public class StreamsBuilder extends Object
StreamsBuilder provide the high-level Kafka Streams DSL to specify a Kafka Streams topology.Topology,
KStream,
KTable,
GlobalKTable| Constructor and Description |
|---|
StreamsBuilder() |
| Modifier and Type | Method and Description |
|---|---|
StreamsBuilder |
addGlobalStore(StateStoreSupplier<KeyValueStore> storeSupplier,
String sourceName,
Deserializer keyDeserializer,
Deserializer valueDeserializer,
String topic,
String processorName,
ProcessorSupplier stateUpdateSupplier)
Adds a global
StateStore to the topology. |
StreamsBuilder |
addGlobalStore(StateStoreSupplier<KeyValueStore> storeSupplier,
String sourceName,
TimestampExtractor timestampExtractor,
Deserializer keyDeserializer,
Deserializer valueDeserializer,
String topic,
String processorName,
ProcessorSupplier stateUpdateSupplier)
Adds a global
StateStore to the topology. |
StreamsBuilder |
addStateStore(StateStoreSupplier supplier,
String... processorNames)
Adds a state store to the underlying
Topology. |
Topology |
build()
Returns the
Topology that represents the specified processing logic. |
<K,V> GlobalKTable<K,V> |
globalTable(Serde<K> keySerde,
Serde<V> valueSerde,
String topic)
Create a
GlobalKTable for the specified topic. |
<K,V> GlobalKTable<K,V> |
globalTable(Serde<K> keySerde,
Serde<V> valueSerde,
String topic,
StateStoreSupplier<KeyValueStore> storeSupplier)
Create a
GlobalKTable for the specified topic. |
<K,V> GlobalKTable<K,V> |
globalTable(Serde<K> keySerde,
Serde<V> valueSerde,
String topic,
String queryableStoreName)
Create a
GlobalKTable for the specified topic. |
<K,V> GlobalKTable<K,V> |
globalTable(Serde<K> keySerde,
Serde<V> valueSerde,
TimestampExtractor timestampExtractor,
String topic,
String queryableStoreName)
Create a
GlobalKTable for the specified topic. |
<K,V> GlobalKTable<K,V> |
globalTable(String topic)
Create a
GlobalKTable for the specified topic. |
<K,V> GlobalKTable<K,V> |
globalTable(String topic,
String queryableStoreName)
Create a
GlobalKTable for the specified topic. |
<K,V> KStream<K,V> |
merge(KStream<K,V>... streams)
|
<K,V> KStream<K,V> |
stream(Pattern topicPattern)
Create a
KStream from the specified topic pattern. |
<K,V> KStream<K,V> |
stream(Serde<K> keySerde,
Serde<V> valueSerde,
Pattern topicPattern)
Create a
KStream from the specified topic pattern. |
<K,V> KStream<K,V> |
stream(Serde<K> keySerde,
Serde<V> valueSerde,
String... topics)
Create a
KStream from the specified topics. |
<K,V> KStream<K,V> |
stream(String... topics)
Create a
KStream from the specified topics. |
<K,V> KStream<K,V> |
stream(TimestampExtractor timestampExtractor,
Serde<K> keySerde,
Serde<V> valueSerde,
Pattern topicPattern)
Create a
KStream from the specified topic pattern. |
<K,V> KStream<K,V> |
stream(TimestampExtractor timestampExtractor,
Serde<K> keySerde,
Serde<V> valueSerde,
String... topics)
Create a
KStream from the specified topics. |
<K,V> KStream<K,V> |
stream(Topology.AutoOffsetReset offsetReset,
Pattern topicPattern)
Create a
KStream from the specified topic pattern. |
<K,V> KStream<K,V> |
stream(Topology.AutoOffsetReset offsetReset,
Serde<K> keySerde,
Serde<V> valueSerde,
Pattern topicPattern)
Create a
KStream from the specified topic pattern. |
<K,V> KStream<K,V> |
stream(Topology.AutoOffsetReset offsetReset,
Serde<K> keySerde,
Serde<V> valueSerde,
String... topics)
Create a
KStream from the specified topics. |
<K,V> KStream<K,V> |
stream(Topology.AutoOffsetReset offsetReset,
String... topics)
Create a
KStream from the specified topics. |
<K,V> KStream<K,V> |
stream(Topology.AutoOffsetReset offsetReset,
TimestampExtractor timestampExtractor,
Serde<K> keySerde,
Serde<V> valueSerde,
Pattern topicPattern)
Create a
KStream from the specified topic pattern. |
<K,V> KStream<K,V> |
stream(Topology.AutoOffsetReset offsetReset,
TimestampExtractor timestampExtractor,
Serde<K> keySerde,
Serde<V> valueSerde,
String... topics)
Create a
KStream from the specified topics. |
<K,V> KTable<K,V> |
table(Serde<K> keySerde,
Serde<V> valueSerde,
String topic)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(Serde<K> keySerde,
Serde<V> valueSerde,
String topic,
StateStoreSupplier<KeyValueStore> storeSupplier)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(Serde<K> keySerde,
Serde<V> valueSerde,
String topic,
String queryableStoreName)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(String topic)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(String topic,
StateStoreSupplier<KeyValueStore> storeSupplier)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(String topic,
String queryableStoreName)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(TimestampExtractor timestampExtractor,
Serde<K> keySerde,
Serde<V> valueSerde,
String topic,
String queryableStoreName)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(TimestampExtractor timestampExtractor,
String topic,
String queryableStoreName)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(Topology.AutoOffsetReset offsetReset,
Serde<K> keySerde,
Serde<V> valueSerde,
String topic)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(Topology.AutoOffsetReset offsetReset,
Serde<K> keySerde,
Serde<V> valueSerde,
String topic,
String queryableStoreName)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(Topology.AutoOffsetReset offsetReset,
String topic)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(Topology.AutoOffsetReset offsetReset,
String topic,
StateStoreSupplier<KeyValueStore> storeSupplier)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(Topology.AutoOffsetReset offsetReset,
String topic,
String queryableStoreName)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(Topology.AutoOffsetReset offsetReset,
TimestampExtractor timestampExtractor,
Serde<K> keySerde,
Serde<V> valueSerde,
String topic,
StateStoreSupplier<KeyValueStore> storeSupplier)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(Topology.AutoOffsetReset offsetReset,
TimestampExtractor timestampExtractor,
Serde<K> keySerde,
Serde<V> valueSerde,
String topic,
String queryableStoreName)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(Topology.AutoOffsetReset offsetReset,
TimestampExtractor timestampExtractor,
String topic,
String queryableStoreName)
Create a
KTable for the specified topic. |
public <K,V> KStream<K,V> stream(String... topics)
KStream from the specified topics.
The default "auto.offset.reset" strategy, default TimestampExtractor, and default key and value
deserializers as specified in the config are used.
If multiple topics are specified there is no ordering guarantee for records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the date before any key based operation
(like aggregation or join) is applied to the returned KStream.
topics - the topic names; must contain at least one topic nameKStream for the specified topicspublic <K,V> KStream<K,V> stream(Topology.AutoOffsetReset offsetReset, String... topics)
KStream from the specified topics.
The default TimestampExtractor and default key and value deserializers as specified in the
config are used.
If multiple topics are specified there is no ordering guarantee for records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the date before any key based operation
(like aggregation or join) is applied to the returned KStream.
offsetReset - the "auto.offset.reset" policy to use for the specified topics if no valid committed
offsets are availabletopics - the topic names; must contain at least one topic nameKStream for the specified topicspublic <K,V> KStream<K,V> stream(Pattern topicPattern)
KStream from the specified topic pattern.
The default "auto.offset.reset" strategy, default TimestampExtractor, and default key and value
deserializers as specified in the config are used.
If multiple topics are matched by the specified pattern, the created KStream will read data from all of
them and there is no ordering guarantee between records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the date before any key based operation
(like aggregation or join) is applied to the returned KStream.
topicPattern - the pattern to match for topic namesKStream for topics matching the regex pattern.public <K,V> KStream<K,V> stream(Topology.AutoOffsetReset offsetReset, Pattern topicPattern)
KStream from the specified topic pattern.
The default TimestampExtractor and default key and value deserializers as specified in the
config are used.
If multiple topics are matched by the specified pattern, the created KStream will read data from all of
them and there is no ordering guarantee between records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the date before any key based operation
(like aggregation or join) is applied to the returned KStream.
offsetReset - the "auto.offset.reset" policy to use for the matched topics if no valid committed
offsets are availabletopicPattern - the pattern to match for topic namesKStream for topics matching the regex pattern.public <K,V> KStream<K,V> stream(Serde<K> keySerde, Serde<V> valueSerde, String... topics)
KStream from the specified topics.
The default "auto.offset.reset" strategy and default TimestampExtractor as specified in the
config are used.
If multiple topics are specified there is no ordering guarantee for records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the date before any key based operation
(like aggregation or join) is applied to the returned KStream.
keySerde - key serde used to read this source KStream,
if not specified the default serde defined in the configs will be usedvalueSerde - value serde used to read this source KStream,
if not specified the default serde defined in the configs will be usedtopics - the topic names; must contain at least one topic nameKStream for the specified topicspublic <K,V> KStream<K,V> stream(Topology.AutoOffsetReset offsetReset, Serde<K> keySerde, Serde<V> valueSerde, String... topics)
KStream from the specified topics.
The default TimestampExtractor as specified in the config is used.
If multiple topics are specified there is no ordering guarantee for records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the date before any key based operation
(like aggregation or join) is applied to the returned KStream.
offsetReset - the "auto.offset.reset" policy to use for the specified topics if no valid committed
offsets are availablekeySerde - key serde used to read this source KStream,
if not specified the default serde defined in the configs will be usedvalueSerde - value serde used to read this source KStream,
if not specified the default serde defined in the configs will be usedtopics - the topic names; must contain at least one topic nameKStream for the specified topicspublic <K,V> KStream<K,V> stream(TimestampExtractor timestampExtractor, Serde<K> keySerde, Serde<V> valueSerde, String... topics)
KStream from the specified topics.
The default "auto.offset.reset" strategy as specified in the config is used.
If multiple topics are specified there is no ordering guarantee for records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the date before any key based operation
(like aggregation or join) is applied to the returned KStream.
timestampExtractor - the stateless timestamp extractor used for this source KStream,
if not specified the default extractor defined in the configs will be usedkeySerde - key serde used to read this source KStream, if not specified the default
serde defined in the configs will be usedvalueSerde - value serde used to read this source KStream,
if not specified the default serde defined in the configs will be usedtopics - the topic names; must contain at least one topic nameKStream for the specified topicspublic <K,V> KStream<K,V> stream(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, Serde<K> keySerde, Serde<V> valueSerde, String... topics)
KStream from the specified topics.
If multiple topics are specified there is no ordering guarantee for records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the date before any key based operation
(like aggregation or join) is applied to the returned KStream.
offsetReset - the "auto.offset.reset" policy to use for the specified topics
if no valid committed offsets are availabletimestampExtractor - the stateless timestamp extractor used for this source KStream,
if not specified the default extractor defined in the configs will be usedkeySerde - key serde used to read this source KStream,
if not specified the default serde defined in the configs will be usedvalueSerde - value serde used to read this source KStream,
if not specified the default serde defined in the configs will be usedtopics - the topic names; must contain at least one topic nameKStream for the specified topicspublic <K,V> KStream<K,V> stream(Serde<K> keySerde, Serde<V> valueSerde, Pattern topicPattern)
KStream from the specified topic pattern.
The default "auto.offset.reset" strategy and default TimestampExtractor
as specified in the config are used.
If multiple topics are matched by the specified pattern, the created KStream will read data from all of
them and there is no ordering guarantee between records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the date before any key based operation
(like aggregation or join) is applied to the returned KStream.
keySerde - key serde used to read this source KStream,
if not specified the default serde defined in the configs will be usedvalueSerde - value serde used to read this source KStream,
if not specified the default serde defined in the configs will be usedtopicPattern - the pattern to match for topic namesKStream for topics matching the regex pattern.public <K,V> KStream<K,V> stream(Topology.AutoOffsetReset offsetReset, Serde<K> keySerde, Serde<V> valueSerde, Pattern topicPattern)
KStream from the specified topic pattern.
The default TimestampExtractor as specified in the config is used.
If multiple topics are matched by the specified pattern, the created KStream will read data from all of
them and there is no ordering guarantee between records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the date before any key based operation
(like aggregation or join) is applied to the returned KStream.
offsetReset - the "auto.offset.reset" policy to use for the matched topics if no valid committed
offsets are availablekeySerde - key serde used to read this source KStream,
if not specified the default serde defined in the configs will be usedvalueSerde - value serde used to read this source KStream,
if not specified the default serde defined in the configs will be usedtopicPattern - the pattern to match for topic namesKStream for topics matching the regex pattern.public <K,V> KStream<K,V> stream(TimestampExtractor timestampExtractor, Serde<K> keySerde, Serde<V> valueSerde, Pattern topicPattern)
KStream from the specified topic pattern.
The default "auto.offset.reset" strategy as specified in the config is used.
If multiple topics are matched by the specified pattern, the created KStream will read data from all of
them and there is no ordering guarantee between records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the date before any key based operation
(like aggregation or join) is applied to the returned KStream.
timestampExtractor - the stateless timestamp extractor used for this source KStream,
if not specified the default extractor defined in the configs will be usedkeySerde - key serde used to read this source KStream,
if not specified the default serde defined in the configs will be usedvalueSerde - value serde used to read this source KStream,
if not specified the default serde defined in the configs will be usedtopicPattern - the pattern to match for topic namesKStream for topics matching the regex pattern.public <K,V> KStream<K,V> stream(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, Serde<K> keySerde, Serde<V> valueSerde, Pattern topicPattern)
KStream from the specified topic pattern.
If multiple topics are matched by the specified pattern, the created KStream will read data from all of
them and there is no ordering guarantee between records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the date before any key based operation
(like aggregation or join) is applied to the returned KStream.
offsetReset - the "auto.offset.reset" policy to use for the matched topics if no valid
committed offsets are availabletimestampExtractor - the stateless timestamp extractor used for this source KStream,
if not specified the default extractor defined in the configs will be usedkeySerde - key serde used to read this source KStream,
if not specified the default serde defined in the configs will be usedvalueSerde - value serde used to read this source KStream,
if not specified the default serde defined in the configs will be usedtopicPattern - the pattern to match for topic namesKStream for topics matching the regex pattern.public <K,V> KTable<K,V> table(String topic, String queryableStoreName)
KTable for the specified topic.
The default "auto.offset.reset" strategy, default TimestampExtractor, and
default key and value deserializers as specified in the config are used.
Input records with null key will be dropped.
Note that the specified input topic must be partitioned by key.
If this is not the case the returned KTable will be corrupted.
The resulting KTable will be materialized in a local KeyValueStore with the given
queryableStoreName.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
To query the local KeyValueStore it must be obtained via
KafkaStreams#store(...):
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata() to
query the value of the key on a parallel running instance of your Kafka Streams application.topic - the topic name; cannot be nullqueryableStoreName - the state store name; if null this is the equivalent of table(String)KTable for the specified topicpublic <K,V> KTable<K,V> table(String topic, StateStoreSupplier<KeyValueStore> storeSupplier)
KTable for the specified topic.
The default "auto.offset.reset" strategy and default key and value deserializers as specified in the
config are used.
Input records with null key will be dropped.
Note that the specified input topics must be partitioned by key.
If this is not the case the returned KTable will be corrupted.
The resulting KTable will be materialized in a local KeyValueStore with the given
queryableStoreName.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
To query the local KeyValueStore it must be obtained via
KafkaStreams#store(...):
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata() to
query the value of the key on a parallel running instance of your Kafka Streams application.topic - the topic name; cannot be nullstoreSupplier - user defined state store supplier; cannot be nullKTable for the specified topicpublic <K,V> KTable<K,V> table(String topic)
KTable for the specified topic.
The default "auto.offset.reset" strategy and default key and value deserializers as specified in the
config are used.
Input records with null key will be dropped.
Note that the specified input topics must be partitioned by key.
If this is not the case the returned KTable will be corrupted.
The resulting KTable will be materialized in a local KeyValueStore with an internal
store name. Note that that store name may not be queriable through Interactive Queries.
No internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
topic - the topic name; cannot be nullKTable for the specified topicpublic <K,V> KTable<K,V> table(Topology.AutoOffsetReset offsetReset, String topic, String queryableStoreName)
KTable for the specified topic.
The default key and value deserializers as specified in the config are used.
Input records with null key will be dropped.
Note that the specified input topics must be partitioned by key.
If this is not the case the returned KTable will be corrupted.
The resulting KTable will be materialized in a local KeyValueStore with the given
queryableStoreName.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
To query the local KeyValueStore it must be obtained via
KafkaStreams#store(...):
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata() to
query the value of the key on a parallel running instance of your Kafka Streams application.offsetReset - the "auto.offset.reset" policy to use for the specified topic if no valid committed
offsets are availabletopic - the topic name; cannot be nullqueryableStoreName - the state store name; if null this is the equivalent of
table(Topology.AutoOffsetReset, String)KTable for the specified topicpublic <K,V> KTable<K,V> table(Topology.AutoOffsetReset offsetReset, String topic, StateStoreSupplier<KeyValueStore> storeSupplier)
KTable for the specified topic.
The default TimestampExtractor and default key and value deserializers
as specified in the config are used.
Input records with null key will be dropped.
Note that the specified input topic must be partitioned by key.
If this is not the case the returned KTable will be corrupted.
The resulting KTable will be materialized in a local KeyValueStore with the given
queryableStoreName.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
To query the local KeyValueStore it must be obtained via
KafkaStreams#store(...):
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata() to
query the value of the key on a parallel running instance of your Kafka Streams application.offsetReset - the "auto.offset.reset" policy to use for the specified topic if no valid committed
offsets are availabletopic - the topic name; cannot be nullstoreSupplier - user defined state store supplier; cannot be nullKTable for the specified topicpublic <K,V> KTable<K,V> table(Topology.AutoOffsetReset offsetReset, String topic)
KTable for the specified topic.
The default key and value deserializers as specified in the config are used.
Input records with null key will be dropped.
Note that the specified input topics must be partitioned by key.
If this is not the case the returned KTable will be corrupted.
The resulting KTable will be materialized in a local KeyValueStore with an internal
store name. Note that that store name may not be queriable through Interactive Queries.
No internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
offsetReset - the "auto.offset.reset" policy to use for the specified topic if no valid committed
offsets are availabletopic - the topic name; cannot be nullKTable for the specified topicpublic <K,V> KTable<K,V> table(TimestampExtractor timestampExtractor, String topic, String queryableStoreName)
KTable for the specified topic.
The default "auto.offset.reset" strategy and default key and value deserializers
as specified in the config are used.
Input KeyValue pairs with null key will be dropped.
Note that the specified input topic must be partitioned by key.
If this is not the case the returned KTable will be corrupted.
The resulting KTable will be materialized in a local KeyValueStore with the given
storeName.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
To query the local KeyValueStore it must be obtained via
KafkaStreams#store(...):
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata() to
query the value of the key on a parallel running instance of your Kafka Streams application.timestampExtractor - the stateless timestamp extractor used for this source KTable,
if not specified the default extractor defined in the configs will be usedtopic - the topic name; cannot be nullqueryableStoreName - the state store name; if null an internal store name will be automatically givenKTable for the specified topicpublic <K,V> KTable<K,V> table(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String topic, String queryableStoreName)
KTable for the specified topic.
The default key and value deserializers as specified in the config are used.
Input KeyValue pairs with null key will be dropped.
Note that the specified input topic must be partitioned by key.
If this is not the case the returned KTable will be corrupted.
The resulting KTable will be materialized in a local KeyValueStore with the given
storeName.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
To query the local KeyValueStore it must be obtained via
KafkaStreams#store(...):
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata() to
query the value of the key on a parallel running instance of your Kafka Streams application.offsetReset - the "auto.offset.reset" policy to use for the specified topic if no valid committed
offsets are availabletimestampExtractor - the stateless timestamp extractor used for this source KTable,
if not specified the default extractor defined in the configs will be usedtopic - the topic name; cannot be nullqueryableStoreName - the state store name; if null an internal store name will be automatically givenKTable for the specified topicpublic <K,V> KTable<K,V> table(Serde<K> keySerde, Serde<V> valueSerde, String topic, String queryableStoreName)
KTable for the specified topic.
The default "auto.offset.reset" strategy and default TimestampExtractor
as specified in the config are used.
Input records with null key will be dropped.
Note that the specified input topic must be partitioned by key.
If this is not the case the returned KTable will be corrupted.
The resulting KTable will be materialized in a local KeyValueStore with the given
queryableStoreName.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
To query the local KeyValueStore it must be obtained via
KafkaStreams#store(...):
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata() to
query the value of the key on a parallel running instance of your Kafka Streams application.keySerde - key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalueSerde - value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic - the topic name; cannot be nullqueryableStoreName - the state store name; if null an internal store name will be automatically givenKTable for the specified topicpublic <K,V> KTable<K,V> table(Serde<K> keySerde, Serde<V> valueSerde, String topic, StateStoreSupplier<KeyValueStore> storeSupplier)
KTable for the specified topic.
The default TimestampExtractor as specified in the config is used.
The default "auto.offset.reset" strategy as specified in the config is used.
Input records with null key will be dropped.
Note that the specified input topic must be partitioned by key.
If this is not the case the returned KTable will be corrupted.
The resulting KTable will be materialized in a local KeyValueStore with the given
queryableStoreName.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
To query the local KeyValueStore it must be obtained via
KafkaStreams#store(...):
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata() to
query the value of the key on a parallel running instance of your Kafka Streams application.keySerde - key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalueSerde - value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic - the topic name; cannot be nullstoreSupplier - user defined state store supplier; cannot be nullKTable for the specified topicpublic <K,V> KTable<K,V> table(Serde<K> keySerde, Serde<V> valueSerde, String topic)
KTable for the specified topic.
The default "auto.offset.reset" strategy as specified in the config is used.
Input records with null key will be dropped.
Note that the specified input topics must be partitioned by key.
If this is not the case the returned KTable will be corrupted.
The resulting KTable will be materialized in a local KeyValueStore with an internal
store name. Note that that store name may not be queriable through Interactive Queries.
No internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
keySerde - key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalueSerde - value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic - the topic name; cannot be nullKTable for the specified topicpublic <K,V> KTable<K,V> table(Topology.AutoOffsetReset offsetReset, Serde<K> keySerde, Serde<V> valueSerde, String topic, String queryableStoreName)
KTable for the specified topic.
Input records with null key will be dropped.
Note that the specified input topics must be partitioned by key.
If this is not the case the returned KTable will be corrupted.
The resulting KTable will be materialized in a local KeyValueStore with the given
queryableStoreName.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
To query the local KeyValueStore it must be obtained via
KafkaStreams#store(...):
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata() to
query the value of the key on a parallel running instance of your Kafka Streams application.offsetReset - the "auto.offset.reset" policy to use for the specified topic if no valid
committed offsets are availablekeySerde - key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalueSerde - value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic - the topic name; cannot be nullqueryableStoreName - the state store name; if null an internal store name will be automatically givenKTable for the specified topicpublic <K,V> KTable<K,V> table(TimestampExtractor timestampExtractor, Serde<K> keySerde, Serde<V> valueSerde, String topic, String queryableStoreName)
KTable for the specified topic.
The default "auto.offset.reset" strategy as specified in the config is used.
Input KeyValue pairs with null key will be dropped.
Note that the specified input topic must be partitioned by key.
If this is not the case the returned KTable will be corrupted.
The resulting KTable will be materialized in a local KeyValueStore with the given
storeName.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
To query the local KeyValueStore it must be obtained via
KafkaStreams#store(...):
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata() to
query the value of the key on a parallel running instance of your Kafka Streams application.timestampExtractor - the stateless timestamp extractor used for this source KTable,
if not specified the default extractor defined in the configs will be usedkeySerde - key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalueSerde - value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic - the topic name; cannot be nullqueryableStoreName - the state store name; if null an internal store name will be automatically givenKTable for the specified topicpublic <K,V> KTable<K,V> table(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, Serde<K> keySerde, Serde<V> valueSerde, String topic, String queryableStoreName)
KTable for the specified topic.
Input KeyValue pairs with null key will be dropped.
Note that the specified input topic must be partitioned by key.
If this is not the case the returned KTable will be corrupted.
The resulting KTable will be materialized in a local KeyValueStore with the given
storeName.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
To query the local KeyValueStore it must be obtained via
KafkaStreams#store(...):
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata() to
query the value of the key on a parallel running instance of your Kafka Streams application.offsetReset - the "auto.offset.reset" policy to use for the specified topic if no valid
committed offsets are availabletimestampExtractor - the stateless timestamp extractor used for this source KTable,
if not specified the default extractor defined in the configs will be usedkeySerde - key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalueSerde - value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic - the topic name; cannot be nullqueryableStoreName - the state store name; if null an internal store name will be automatically givenKTable for the specified topicpublic <K,V> KTable<K,V> table(Topology.AutoOffsetReset offsetReset, Serde<K> keySerde, Serde<V> valueSerde, String topic)
KTable for the specified topic.
The default "auto.offset.reset" strategy as specified in the config is used.
Input records with null key will be dropped.
Note that the specified input topics must be partitioned by key.
If this is not the case the returned KTable will be corrupted.
The resulting KTable will be materialized in a local KeyValueStore with an internal
store name. Note that that store name may not be queriable through Interactive Queries.
No internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
offsetReset - the "auto.offset.reset" policy to use for the specified topic if no valid committed
offsets are availablekeySerde - key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalueSerde - value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic - the topic name; cannot be nullKTable for the specified topicpublic <K,V> KTable<K,V> table(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, Serde<K> keySerde, Serde<V> valueSerde, String topic, StateStoreSupplier<KeyValueStore> storeSupplier)
KTable for the specified topic.
Input records with null key will be dropped.
Note that the specified input topics must be partitioned by key.
If this is not the case the returned KTable will be corrupted.
The resulting KTable will be materialized in a local KeyValueStore with the given
queryableStoreName.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
To query the local KeyValueStore it must be obtained via
KafkaStreams#store(...):
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata() to
query the value of the key on a parallel running instance of your Kafka Streams application.offsetReset - the "auto.offset.reset" policy to use for the specified topic if no valid committed
offsets are availabletimestampExtractor - the stateless timestamp extractor used for this source KTable,
if not specified the default extractor defined in the configs will be usedkeySerde - key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalueSerde - value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic - the topic name; cannot be nullstoreSupplier - user defined state store supplier; cannot be nullKTable for the specified topicpublic <K,V> GlobalKTable<K,V> globalTable(String topic, String queryableStoreName)
GlobalKTable for the specified topic.
The default key and value deserializers as specified in the config are used.
Input records with null key will be dropped.
The resulting GlobalKTable will be materialized in a local KeyValueStore with the given
queryableStoreName.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
To query the local KeyValueStore it must be obtained via
KafkaStreams#store(...):
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key);
Note that GlobalKTable always applies "auto.offset.reset" strategy "earliest"
regardless of the specified value in StreamsConfig.topic - the topic name; cannot be nullqueryableStoreName - the state store name; if null an internal store name will be automatically givenGlobalKTable for the specified topicpublic <K,V> GlobalKTable<K,V> globalTable(String topic)
GlobalKTable for the specified topic.
The default key and value deserializers as specified in the config are used.
Input records with null key will be dropped.
The resulting GlobalKTable will be materialized in a local KeyValueStore with an internal
store name. Note that that store name may not be queriable through Interactive Queries.
No internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
Note that GlobalKTable always applies "auto.offset.reset" strategy "earliest"
regardless of the specified value in StreamsConfig.
topic - the topic name; cannot be nullGlobalKTable for the specified topicpublic <K,V> GlobalKTable<K,V> globalTable(Serde<K> keySerde, Serde<V> valueSerde, TimestampExtractor timestampExtractor, String topic, String queryableStoreName)
GlobalKTable for the specified topic.
The default TimestampExtractor and default key and value deserializers as specified in
the config are used.
Input KeyValue pairs with null key will be dropped.
The resulting GlobalKTable will be materialized in a local KeyValueStore with the given
queryableStoreName.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
To query the local KeyValueStore it must be obtained via
KafkaStreams#store(...):
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key);
Note that GlobalKTable always applies "auto.offset.reset" strategy "earliest"
regardless of the specified value in StreamsConfig.keySerde - key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalueSerde - value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtimestampExtractor - the stateless timestamp extractor used for this source KTable,
if not specified the default extractor defined in the configs will be usedtopic - the topic name; cannot be nullqueryableStoreName - the state store name; if null an internal store name will be automatically givenGlobalKTable for the specified topicpublic <K,V> GlobalKTable<K,V> globalTable(Serde<K> keySerde, Serde<V> valueSerde, String topic, StateStoreSupplier<KeyValueStore> storeSupplier)
GlobalKTable for the specified topic.
The default TimestampExtractor as specified in the config is used.
Input KeyValue pairs with null key will be dropped.
The resulting GlobalKTable will be materialized in a local KeyValueStore with the given
queryableStoreName.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
To query the local KeyValueStore it must be obtained via
KafkaStreams#store(...):
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key);
Note that GlobalKTable always applies "auto.offset.reset" strategy "earliest"
regardless of the specified value in StreamsConfig.keySerde - key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalueSerde - value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic - the topic name; cannot be nullstoreSupplier - user defined state store supplier; Cannot be nullGlobalKTable for the specified topicpublic <K,V> GlobalKTable<K,V> globalTable(Serde<K> keySerde, Serde<V> valueSerde, String topic, String queryableStoreName)
GlobalKTable for the specified topic.
The default TimestampExtractor as specified in the config is used.
Input KeyValue pairs with null key will be dropped.
The resulting GlobalKTable will be materialized in a local KeyValueStore with the given
queryableStoreName.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
To query the local KeyValueStore it must be obtained via
KafkaStreams#store(...):
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key);
Note that GlobalKTable always applies "auto.offset.reset" strategy "earliest"
regardless of the specified value in StreamsConfig.keySerde - key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalueSerde - value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic - the topic name; cannot be nullqueryableStoreName - the state store name; if null an internal store name will be automatically givenGlobalKTable for the specified topicpublic <K,V> GlobalKTable<K,V> globalTable(Serde<K> keySerde, Serde<V> valueSerde, String topic)
GlobalKTable for the specified topic.
The default key and value deserializers as specified in the config are used.
Input records with null key will be dropped.
The resulting GlobalKTable will be materialized in a local KeyValueStore with an internal
store name. Note that that store name may not be queriable through Interactive Queries.
No internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream and KGroupedTable that return a KTable).
Note that GlobalKTable always applies "auto.offset.reset" strategy "earliest"
regardless of the specified value in StreamsConfig.
keySerde - key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalueSerde - value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic - the topic name; cannot be nullGlobalKTable for the specified topicpublic StreamsBuilder addStateStore(StateStoreSupplier supplier, String... processorNames)
Topology.supplier - the supplier used to obtain this state store StateStore instanceprocessorNames - the names of the processors that should be able to access the provided storeTopologyException - if state store supplier is already addedpublic StreamsBuilder addGlobalStore(StateStoreSupplier<KeyValueStore> storeSupplier, String sourceName, Deserializer keyDeserializer, Deserializer valueDeserializer, String topic, String processorName, ProcessorSupplier stateUpdateSupplier)
StateStore to the topology.
The StateStore sources its data from all partitions of the provided input topic.
There will be exactly one instance of this StateStore per Kafka Streams instance.
A SourceNode with the provided sourceName will be added to consume the data arriving from the partitions
of the input topic.
The provided ProcessorSupplier will be used to create an ProcessorNode that will receive all
records forwarded from the SourceNode.
This ProcessorNode should be used to keep the StateStore up-to-date.
The default TimestampExtractor as specified in the config is used.
storeSupplier - user defined state store suppliersourceName - name of the SourceNode that will be automatically addedkeyDeserializer - the Deserializer to deserialize keys withvalueDeserializer - the Deserializer to deserialize values withtopic - the topic to source the data fromprocessorName - the name of the ProcessorSupplierstateUpdateSupplier - the instance of ProcessorSupplierTopologyException - if the processor of state is already registeredpublic StreamsBuilder addGlobalStore(StateStoreSupplier<KeyValueStore> storeSupplier, String sourceName, TimestampExtractor timestampExtractor, Deserializer keyDeserializer, Deserializer valueDeserializer, String topic, String processorName, ProcessorSupplier stateUpdateSupplier)
StateStore to the topology.
The StateStore sources its data from all partitions of the provided input topic.
There will be exactly one instance of this StateStore per Kafka Streams instance.
A SourceNode with the provided sourceName will be added to consume the data arriving from the partitions
of the input topic.
The provided ProcessorSupplier will be used to create an ProcessorNode that will receive all
records forwarded from the SourceNode.
This ProcessorNode should be used to keep the StateStore up-to-date.
storeSupplier - user defined state store suppliersourceName - name of the SourceNode that will be automatically addedtimestampExtractor - the stateless timestamp extractor used for this source,
if not specified the default extractor defined in the configs will be usedkeyDeserializer - the Deserializer to deserialize keys withvalueDeserializer - the Deserializer to deserialize values withtopic - the topic to source the data fromprocessorName - the name of the ProcessorSupplierstateUpdateSupplier - the instance of ProcessorSupplierTopologyException - if the processor of state is already registered