public class StreamsBuilder extends Object
StreamsBuilder
provide the high-level Kafka Streams DSL to specify a Kafka Streams topology.Topology
,
KStream
,
KTable
,
GlobalKTable
Constructor and Description |
---|
StreamsBuilder() |
Modifier and Type | Method and Description |
---|---|
StreamsBuilder |
addGlobalStore(StateStoreSupplier<KeyValueStore> storeSupplier,
String sourceName,
Deserializer keyDeserializer,
Deserializer valueDeserializer,
String topic,
String processorName,
ProcessorSupplier stateUpdateSupplier)
Adds a global
StateStore to the topology. |
StreamsBuilder |
addGlobalStore(StateStoreSupplier<KeyValueStore> storeSupplier,
String sourceName,
TimestampExtractor timestampExtractor,
Deserializer keyDeserializer,
Deserializer valueDeserializer,
String topic,
String processorName,
ProcessorSupplier stateUpdateSupplier)
Adds a global
StateStore to the topology. |
StreamsBuilder |
addStateStore(StateStoreSupplier supplier,
String... processorNames)
Adds a state store to the underlying
Topology . |
Topology |
build()
Returns the
Topology that represents the specified processing logic. |
<K,V> GlobalKTable<K,V> |
globalTable(Serde<K> keySerde,
Serde<V> valueSerde,
String topic)
Create a
GlobalKTable for the specified topic. |
<K,V> GlobalKTable<K,V> |
globalTable(Serde<K> keySerde,
Serde<V> valueSerde,
String topic,
StateStoreSupplier<KeyValueStore> storeSupplier)
Create a
GlobalKTable for the specified topic. |
<K,V> GlobalKTable<K,V> |
globalTable(Serde<K> keySerde,
Serde<V> valueSerde,
String topic,
String queryableStoreName)
Create a
GlobalKTable for the specified topic. |
<K,V> GlobalKTable<K,V> |
globalTable(Serde<K> keySerde,
Serde<V> valueSerde,
TimestampExtractor timestampExtractor,
String topic,
String queryableStoreName)
Create a
GlobalKTable for the specified topic. |
<K,V> GlobalKTable<K,V> |
globalTable(String topic)
Create a
GlobalKTable for the specified topic. |
<K,V> GlobalKTable<K,V> |
globalTable(String topic,
String queryableStoreName)
Create a
GlobalKTable for the specified topic. |
<K,V> KStream<K,V> |
merge(KStream<K,V>... streams)
|
<K,V> KStream<K,V> |
stream(Pattern topicPattern)
Create a
KStream from the specified topic pattern. |
<K,V> KStream<K,V> |
stream(Serde<K> keySerde,
Serde<V> valueSerde,
Pattern topicPattern)
Create a
KStream from the specified topic pattern. |
<K,V> KStream<K,V> |
stream(Serde<K> keySerde,
Serde<V> valueSerde,
String... topics)
Create a
KStream from the specified topics. |
<K,V> KStream<K,V> |
stream(String... topics)
Create a
KStream from the specified topics. |
<K,V> KStream<K,V> |
stream(TimestampExtractor timestampExtractor,
Serde<K> keySerde,
Serde<V> valueSerde,
Pattern topicPattern)
Create a
KStream from the specified topic pattern. |
<K,V> KStream<K,V> |
stream(TimestampExtractor timestampExtractor,
Serde<K> keySerde,
Serde<V> valueSerde,
String... topics)
Create a
KStream from the specified topics. |
<K,V> KStream<K,V> |
stream(Topology.AutoOffsetReset offsetReset,
Pattern topicPattern)
Create a
KStream from the specified topic pattern. |
<K,V> KStream<K,V> |
stream(Topology.AutoOffsetReset offsetReset,
Serde<K> keySerde,
Serde<V> valueSerde,
Pattern topicPattern)
Create a
KStream from the specified topic pattern. |
<K,V> KStream<K,V> |
stream(Topology.AutoOffsetReset offsetReset,
Serde<K> keySerde,
Serde<V> valueSerde,
String... topics)
Create a
KStream from the specified topics. |
<K,V> KStream<K,V> |
stream(Topology.AutoOffsetReset offsetReset,
String... topics)
Create a
KStream from the specified topics. |
<K,V> KStream<K,V> |
stream(Topology.AutoOffsetReset offsetReset,
TimestampExtractor timestampExtractor,
Serde<K> keySerde,
Serde<V> valueSerde,
Pattern topicPattern)
Create a
KStream from the specified topic pattern. |
<K,V> KStream<K,V> |
stream(Topology.AutoOffsetReset offsetReset,
TimestampExtractor timestampExtractor,
Serde<K> keySerde,
Serde<V> valueSerde,
String... topics)
Create a
KStream from the specified topics. |
<K,V> KTable<K,V> |
table(Serde<K> keySerde,
Serde<V> valueSerde,
String topic)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(Serde<K> keySerde,
Serde<V> valueSerde,
String topic,
StateStoreSupplier<KeyValueStore> storeSupplier)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(Serde<K> keySerde,
Serde<V> valueSerde,
String topic,
String queryableStoreName)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(String topic)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(String topic,
StateStoreSupplier<KeyValueStore> storeSupplier)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(String topic,
String queryableStoreName)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(TimestampExtractor timestampExtractor,
Serde<K> keySerde,
Serde<V> valueSerde,
String topic,
String queryableStoreName)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(TimestampExtractor timestampExtractor,
String topic,
String queryableStoreName)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(Topology.AutoOffsetReset offsetReset,
Serde<K> keySerde,
Serde<V> valueSerde,
String topic)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(Topology.AutoOffsetReset offsetReset,
Serde<K> keySerde,
Serde<V> valueSerde,
String topic,
String queryableStoreName)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(Topology.AutoOffsetReset offsetReset,
String topic)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(Topology.AutoOffsetReset offsetReset,
String topic,
StateStoreSupplier<KeyValueStore> storeSupplier)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(Topology.AutoOffsetReset offsetReset,
String topic,
String queryableStoreName)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(Topology.AutoOffsetReset offsetReset,
TimestampExtractor timestampExtractor,
Serde<K> keySerde,
Serde<V> valueSerde,
String topic,
StateStoreSupplier<KeyValueStore> storeSupplier)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(Topology.AutoOffsetReset offsetReset,
TimestampExtractor timestampExtractor,
Serde<K> keySerde,
Serde<V> valueSerde,
String topic,
String queryableStoreName)
Create a
KTable for the specified topic. |
<K,V> KTable<K,V> |
table(Topology.AutoOffsetReset offsetReset,
TimestampExtractor timestampExtractor,
String topic,
String queryableStoreName)
Create a
KTable for the specified topic. |
public <K,V> KStream<K,V> stream(String... topics)
KStream
from the specified topics.
The default "auto.offset.reset"
strategy, default TimestampExtractor
, and default key and value
deserializers as specified in the config
are used.
If multiple topics are specified there is no ordering guarantee for records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the date before any key based operation
(like aggregation or join) is applied to the returned KStream
.
topics
- the topic names; must contain at least one topic nameKStream
for the specified topicspublic <K,V> KStream<K,V> stream(Topology.AutoOffsetReset offsetReset, String... topics)
KStream
from the specified topics.
The default TimestampExtractor
and default key and value deserializers as specified in the
config
are used.
If multiple topics are specified there is no ordering guarantee for records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the date before any key based operation
(like aggregation or join) is applied to the returned KStream
.
offsetReset
- the "auto.offset.reset"
policy to use for the specified topics if no valid committed
offsets are availabletopics
- the topic names; must contain at least one topic nameKStream
for the specified topicspublic <K,V> KStream<K,V> stream(Pattern topicPattern)
KStream
from the specified topic pattern.
The default "auto.offset.reset"
strategy, default TimestampExtractor
, and default key and value
deserializers as specified in the config
are used.
If multiple topics are matched by the specified pattern, the created KStream
will read data from all of
them and there is no ordering guarantee between records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the date before any key based operation
(like aggregation or join) is applied to the returned KStream
.
topicPattern
- the pattern to match for topic namesKStream
for topics matching the regex pattern.public <K,V> KStream<K,V> stream(Topology.AutoOffsetReset offsetReset, Pattern topicPattern)
KStream
from the specified topic pattern.
The default TimestampExtractor
and default key and value deserializers as specified in the
config
are used.
If multiple topics are matched by the specified pattern, the created KStream
will read data from all of
them and there is no ordering guarantee between records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the date before any key based operation
(like aggregation or join) is applied to the returned KStream
.
offsetReset
- the "auto.offset.reset"
policy to use for the matched topics if no valid committed
offsets are availabletopicPattern
- the pattern to match for topic namesKStream
for topics matching the regex pattern.public <K,V> KStream<K,V> stream(Serde<K> keySerde, Serde<V> valueSerde, String... topics)
KStream
from the specified topics.
The default "auto.offset.reset"
strategy and default TimestampExtractor
as specified in the
config
are used.
If multiple topics are specified there is no ordering guarantee for records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the date before any key based operation
(like aggregation or join) is applied to the returned KStream
.
keySerde
- key serde used to read this source KStream
,
if not specified the default serde defined in the configs will be usedvalueSerde
- value serde used to read this source KStream
,
if not specified the default serde defined in the configs will be usedtopics
- the topic names; must contain at least one topic nameKStream
for the specified topicspublic <K,V> KStream<K,V> stream(Topology.AutoOffsetReset offsetReset, Serde<K> keySerde, Serde<V> valueSerde, String... topics)
KStream
from the specified topics.
The default TimestampExtractor
as specified in the config
is used.
If multiple topics are specified there is no ordering guarantee for records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the date before any key based operation
(like aggregation or join) is applied to the returned KStream
.
offsetReset
- the "auto.offset.reset"
policy to use for the specified topics if no valid committed
offsets are availablekeySerde
- key serde used to read this source KStream
,
if not specified the default serde defined in the configs will be usedvalueSerde
- value serde used to read this source KStream
,
if not specified the default serde defined in the configs will be usedtopics
- the topic names; must contain at least one topic nameKStream
for the specified topicspublic <K,V> KStream<K,V> stream(TimestampExtractor timestampExtractor, Serde<K> keySerde, Serde<V> valueSerde, String... topics)
KStream
from the specified topics.
The default "auto.offset.reset"
strategy as specified in the config
is used.
If multiple topics are specified there is no ordering guarantee for records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the date before any key based operation
(like aggregation or join) is applied to the returned KStream
.
timestampExtractor
- the stateless timestamp extractor used for this source KStream
,
if not specified the default extractor defined in the configs will be usedkeySerde
- key serde used to read this source KStream
, if not specified the default
serde defined in the configs will be usedvalueSerde
- value serde used to read this source KStream
,
if not specified the default serde defined in the configs will be usedtopics
- the topic names; must contain at least one topic nameKStream
for the specified topicspublic <K,V> KStream<K,V> stream(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, Serde<K> keySerde, Serde<V> valueSerde, String... topics)
KStream
from the specified topics.
If multiple topics are specified there is no ordering guarantee for records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the date before any key based operation
(like aggregation or join) is applied to the returned KStream
.
offsetReset
- the "auto.offset.reset"
policy to use for the specified topics
if no valid committed offsets are availabletimestampExtractor
- the stateless timestamp extractor used for this source KStream
,
if not specified the default extractor defined in the configs will be usedkeySerde
- key serde used to read this source KStream
,
if not specified the default serde defined in the configs will be usedvalueSerde
- value serde used to read this source KStream
,
if not specified the default serde defined in the configs will be usedtopics
- the topic names; must contain at least one topic nameKStream
for the specified topicspublic <K,V> KStream<K,V> stream(Serde<K> keySerde, Serde<V> valueSerde, Pattern topicPattern)
KStream
from the specified topic pattern.
The default "auto.offset.reset"
strategy and default TimestampExtractor
as specified in the config
are used.
If multiple topics are matched by the specified pattern, the created KStream
will read data from all of
them and there is no ordering guarantee between records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the date before any key based operation
(like aggregation or join) is applied to the returned KStream
.
keySerde
- key serde used to read this source KStream
,
if not specified the default serde defined in the configs will be usedvalueSerde
- value serde used to read this source KStream
,
if not specified the default serde defined in the configs will be usedtopicPattern
- the pattern to match for topic namesKStream
for topics matching the regex pattern.public <K,V> KStream<K,V> stream(Topology.AutoOffsetReset offsetReset, Serde<K> keySerde, Serde<V> valueSerde, Pattern topicPattern)
KStream
from the specified topic pattern.
The default TimestampExtractor
as specified in the config
is used.
If multiple topics are matched by the specified pattern, the created KStream
will read data from all of
them and there is no ordering guarantee between records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the date before any key based operation
(like aggregation or join) is applied to the returned KStream
.
offsetReset
- the "auto.offset.reset"
policy to use for the matched topics if no valid committed
offsets are availablekeySerde
- key serde used to read this source KStream
,
if not specified the default serde defined in the configs will be usedvalueSerde
- value serde used to read this source KStream
,
if not specified the default serde defined in the configs will be usedtopicPattern
- the pattern to match for topic namesKStream
for topics matching the regex pattern.public <K,V> KStream<K,V> stream(TimestampExtractor timestampExtractor, Serde<K> keySerde, Serde<V> valueSerde, Pattern topicPattern)
KStream
from the specified topic pattern.
The default "auto.offset.reset"
strategy as specified in the config
is used.
If multiple topics are matched by the specified pattern, the created KStream
will read data from all of
them and there is no ordering guarantee between records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the date before any key based operation
(like aggregation or join) is applied to the returned KStream
.
timestampExtractor
- the stateless timestamp extractor used for this source KStream
,
if not specified the default extractor defined in the configs will be usedkeySerde
- key serde used to read this source KStream
,
if not specified the default serde defined in the configs will be usedvalueSerde
- value serde used to read this source KStream
,
if not specified the default serde defined in the configs will be usedtopicPattern
- the pattern to match for topic namesKStream
for topics matching the regex pattern.public <K,V> KStream<K,V> stream(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, Serde<K> keySerde, Serde<V> valueSerde, Pattern topicPattern)
KStream
from the specified topic pattern.
If multiple topics are matched by the specified pattern, the created KStream
will read data from all of
them and there is no ordering guarantee between records from different topics.
Note that the specified input topics must be partitioned by key.
If this is not the case it is the user's responsibility to repartition the date before any key based operation
(like aggregation or join) is applied to the returned KStream
.
offsetReset
- the "auto.offset.reset"
policy to use for the matched topics if no valid
committed offsets are availabletimestampExtractor
- the stateless timestamp extractor used for this source KStream
,
if not specified the default extractor defined in the configs will be usedkeySerde
- key serde used to read this source KStream
,
if not specified the default serde defined in the configs will be usedvalueSerde
- value serde used to read this source KStream
,
if not specified the default serde defined in the configs will be usedtopicPattern
- the pattern to match for topic namesKStream
for topics matching the regex pattern.public <K,V> KTable<K,V> table(String topic, String queryableStoreName)
KTable
for the specified topic.
The default "auto.offset.reset"
strategy, default TimestampExtractor
, and
default key and value deserializers as specified in the config
are used.
Input records
with null
key will be dropped.
Note that the specified input topic must be partitioned by key.
If this is not the case the returned KTable
will be corrupted.
The resulting KTable
will be materialized in a local KeyValueStore
with the given
queryableStoreName
.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
To query the local KeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata()
to
query the value of the key on a parallel running instance of your Kafka Streams application.topic
- the topic name; cannot be null
queryableStoreName
- the state store name; if null
this is the equivalent of table(String)
KTable
for the specified topicpublic <K,V> KTable<K,V> table(String topic, StateStoreSupplier<KeyValueStore> storeSupplier)
KTable
for the specified topic.
The default "auto.offset.reset"
strategy and default key and value deserializers as specified in the
config
are used.
Input records
with null
key will be dropped.
Note that the specified input topics must be partitioned by key.
If this is not the case the returned KTable
will be corrupted.
The resulting KTable
will be materialized in a local KeyValueStore
with the given
queryableStoreName
.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
To query the local KeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata()
to
query the value of the key on a parallel running instance of your Kafka Streams application.topic
- the topic name; cannot be null
storeSupplier
- user defined state store supplier; cannot be null
KTable
for the specified topicpublic <K,V> KTable<K,V> table(String topic)
KTable
for the specified topic.
The default "auto.offset.reset"
strategy and default key and value deserializers as specified in the
config
are used.
Input records
with null
key will be dropped.
Note that the specified input topics must be partitioned by key.
If this is not the case the returned KTable
will be corrupted.
The resulting KTable
will be materialized in a local KeyValueStore
with an internal
store name. Note that that store name may not be queriable through Interactive Queries.
No internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
topic
- the topic name; cannot be null
KTable
for the specified topicpublic <K,V> KTable<K,V> table(Topology.AutoOffsetReset offsetReset, String topic, String queryableStoreName)
KTable
for the specified topic.
The default key and value deserializers as specified in the config
are used.
Input records
with null
key will be dropped.
Note that the specified input topics must be partitioned by key.
If this is not the case the returned KTable
will be corrupted.
The resulting KTable
will be materialized in a local KeyValueStore
with the given
queryableStoreName
.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
To query the local KeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata()
to
query the value of the key on a parallel running instance of your Kafka Streams application.offsetReset
- the "auto.offset.reset"
policy to use for the specified topic if no valid committed
offsets are availabletopic
- the topic name; cannot be null
queryableStoreName
- the state store name; if null
this is the equivalent of
table(Topology.AutoOffsetReset, String)
KTable
for the specified topicpublic <K,V> KTable<K,V> table(Topology.AutoOffsetReset offsetReset, String topic, StateStoreSupplier<KeyValueStore> storeSupplier)
KTable
for the specified topic.
The default TimestampExtractor
and default key and value deserializers
as specified in the config
are used.
Input records
with null
key will be dropped.
Note that the specified input topic must be partitioned by key.
If this is not the case the returned KTable
will be corrupted.
The resulting KTable
will be materialized in a local KeyValueStore
with the given
queryableStoreName
.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
To query the local KeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata()
to
query the value of the key on a parallel running instance of your Kafka Streams application.offsetReset
- the "auto.offset.reset"
policy to use for the specified topic if no valid committed
offsets are availabletopic
- the topic name; cannot be null
storeSupplier
- user defined state store supplier; cannot be null
KTable
for the specified topicpublic <K,V> KTable<K,V> table(Topology.AutoOffsetReset offsetReset, String topic)
KTable
for the specified topic.
The default key and value deserializers as specified in the config
are used.
Input records
with null
key will be dropped.
Note that the specified input topics must be partitioned by key.
If this is not the case the returned KTable
will be corrupted.
The resulting KTable
will be materialized in a local KeyValueStore
with an internal
store name. Note that that store name may not be queriable through Interactive Queries.
No internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
offsetReset
- the "auto.offset.reset"
policy to use for the specified topic if no valid committed
offsets are availabletopic
- the topic name; cannot be null
KTable
for the specified topicpublic <K,V> KTable<K,V> table(TimestampExtractor timestampExtractor, String topic, String queryableStoreName)
KTable
for the specified topic.
The default "auto.offset.reset"
strategy and default key and value deserializers
as specified in the config
are used.
Input KeyValue
pairs with null
key will be dropped.
Note that the specified input topic must be partitioned by key.
If this is not the case the returned KTable
will be corrupted.
The resulting KTable
will be materialized in a local KeyValueStore
with the given
storeName
.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
To query the local KeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata()
to
query the value of the key on a parallel running instance of your Kafka Streams application.timestampExtractor
- the stateless timestamp extractor used for this source KTable
,
if not specified the default extractor defined in the configs will be usedtopic
- the topic name; cannot be null
queryableStoreName
- the state store name; if null
an internal store name will be automatically givenKTable
for the specified topicpublic <K,V> KTable<K,V> table(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String topic, String queryableStoreName)
KTable
for the specified topic.
The default key and value deserializers as specified in the config
are used.
Input KeyValue
pairs with null
key will be dropped.
Note that the specified input topic must be partitioned by key.
If this is not the case the returned KTable
will be corrupted.
The resulting KTable
will be materialized in a local KeyValueStore
with the given
storeName
.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
To query the local KeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata()
to
query the value of the key on a parallel running instance of your Kafka Streams application.offsetReset
- the "auto.offset.reset"
policy to use for the specified topic if no valid committed
offsets are availabletimestampExtractor
- the stateless timestamp extractor used for this source KTable
,
if not specified the default extractor defined in the configs will be usedtopic
- the topic name; cannot be null
queryableStoreName
- the state store name; if null
an internal store name will be automatically givenKTable
for the specified topicpublic <K,V> KTable<K,V> table(Serde<K> keySerde, Serde<V> valueSerde, String topic, String queryableStoreName)
KTable
for the specified topic.
The default "auto.offset.reset"
strategy and default TimestampExtractor
as specified in the config
are used.
Input records
with null
key will be dropped.
Note that the specified input topic must be partitioned by key.
If this is not the case the returned KTable
will be corrupted.
The resulting KTable
will be materialized in a local KeyValueStore
with the given
queryableStoreName
.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
To query the local KeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata()
to
query the value of the key on a parallel running instance of your Kafka Streams application.keySerde
- key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalueSerde
- value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic
- the topic name; cannot be null
queryableStoreName
- the state store name; if null
an internal store name will be automatically givenKTable
for the specified topicpublic <K,V> KTable<K,V> table(Serde<K> keySerde, Serde<V> valueSerde, String topic, StateStoreSupplier<KeyValueStore> storeSupplier)
KTable
for the specified topic.
The default TimestampExtractor
as specified in the config
is used.
The default "auto.offset.reset"
strategy as specified in the config
is used.
Input records
with null
key will be dropped.
Note that the specified input topic must be partitioned by key.
If this is not the case the returned KTable
will be corrupted.
The resulting KTable
will be materialized in a local KeyValueStore
with the given
queryableStoreName
.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
To query the local KeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata()
to
query the value of the key on a parallel running instance of your Kafka Streams application.keySerde
- key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalueSerde
- value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic
- the topic name; cannot be null
storeSupplier
- user defined state store supplier; cannot be null
KTable
for the specified topicpublic <K,V> KTable<K,V> table(Serde<K> keySerde, Serde<V> valueSerde, String topic)
KTable
for the specified topic.
The default "auto.offset.reset"
strategy as specified in the config
is used.
Input records
with null
key will be dropped.
Note that the specified input topics must be partitioned by key.
If this is not the case the returned KTable
will be corrupted.
The resulting KTable
will be materialized in a local KeyValueStore
with an internal
store name. Note that that store name may not be queriable through Interactive Queries.
No internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
keySerde
- key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalueSerde
- value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic
- the topic name; cannot be null
KTable
for the specified topicpublic <K,V> KTable<K,V> table(Topology.AutoOffsetReset offsetReset, Serde<K> keySerde, Serde<V> valueSerde, String topic, String queryableStoreName)
KTable
for the specified topic.
Input records
with null
key will be dropped.
Note that the specified input topics must be partitioned by key.
If this is not the case the returned KTable
will be corrupted.
The resulting KTable
will be materialized in a local KeyValueStore
with the given
queryableStoreName
.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
To query the local KeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata()
to
query the value of the key on a parallel running instance of your Kafka Streams application.offsetReset
- the "auto.offset.reset"
policy to use for the specified topic if no valid
committed offsets are availablekeySerde
- key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalueSerde
- value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic
- the topic name; cannot be null
queryableStoreName
- the state store name; if null
an internal store name will be automatically givenKTable
for the specified topicpublic <K,V> KTable<K,V> table(TimestampExtractor timestampExtractor, Serde<K> keySerde, Serde<V> valueSerde, String topic, String queryableStoreName)
KTable
for the specified topic.
The default "auto.offset.reset"
strategy as specified in the config
is used.
Input KeyValue
pairs with null
key will be dropped.
Note that the specified input topic must be partitioned by key.
If this is not the case the returned KTable
will be corrupted.
The resulting KTable
will be materialized in a local KeyValueStore
with the given
storeName
.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
To query the local KeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata()
to
query the value of the key on a parallel running instance of your Kafka Streams application.timestampExtractor
- the stateless timestamp extractor used for this source KTable
,
if not specified the default extractor defined in the configs will be usedkeySerde
- key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalueSerde
- value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic
- the topic name; cannot be null
queryableStoreName
- the state store name; if null
an internal store name will be automatically givenKTable
for the specified topicpublic <K,V> KTable<K,V> table(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, Serde<K> keySerde, Serde<V> valueSerde, String topic, String queryableStoreName)
KTable
for the specified topic.
Input KeyValue
pairs with null
key will be dropped.
Note that the specified input topic must be partitioned by key.
If this is not the case the returned KTable
will be corrupted.
The resulting KTable
will be materialized in a local KeyValueStore
with the given
storeName
.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
To query the local KeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata()
to
query the value of the key on a parallel running instance of your Kafka Streams application.offsetReset
- the "auto.offset.reset"
policy to use for the specified topic if no valid
committed offsets are availabletimestampExtractor
- the stateless timestamp extractor used for this source KTable
,
if not specified the default extractor defined in the configs will be usedkeySerde
- key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalueSerde
- value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic
- the topic name; cannot be null
queryableStoreName
- the state store name; if null
an internal store name will be automatically givenKTable
for the specified topicpublic <K,V> KTable<K,V> table(Topology.AutoOffsetReset offsetReset, Serde<K> keySerde, Serde<V> valueSerde, String topic)
KTable
for the specified topic.
The default "auto.offset.reset"
strategy as specified in the config
is used.
Input records
with null
key will be dropped.
Note that the specified input topics must be partitioned by key.
If this is not the case the returned KTable
will be corrupted.
The resulting KTable
will be materialized in a local KeyValueStore
with an internal
store name. Note that that store name may not be queriable through Interactive Queries.
No internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
offsetReset
- the "auto.offset.reset"
policy to use for the specified topic if no valid committed
offsets are availablekeySerde
- key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalueSerde
- value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic
- the topic name; cannot be null
KTable
for the specified topicpublic <K,V> KTable<K,V> table(Topology.AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, Serde<K> keySerde, Serde<V> valueSerde, String topic, StateStoreSupplier<KeyValueStore> storeSupplier)
KTable
for the specified topic.
Input records
with null
key will be dropped.
Note that the specified input topics must be partitioned by key.
If this is not the case the returned KTable
will be corrupted.
The resulting KTable
will be materialized in a local KeyValueStore
with the given
queryableStoreName
.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
To query the local KeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata()
to
query the value of the key on a parallel running instance of your Kafka Streams application.offsetReset
- the "auto.offset.reset"
policy to use for the specified topic if no valid committed
offsets are availabletimestampExtractor
- the stateless timestamp extractor used for this source KTable
,
if not specified the default extractor defined in the configs will be usedkeySerde
- key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalueSerde
- value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic
- the topic name; cannot be null
storeSupplier
- user defined state store supplier; cannot be null
KTable
for the specified topicpublic <K,V> GlobalKTable<K,V> globalTable(String topic, String queryableStoreName)
GlobalKTable
for the specified topic.
The default key and value deserializers as specified in the config
are used.
Input records
with null
key will be dropped.
The resulting GlobalKTable
will be materialized in a local KeyValueStore
with the given
queryableStoreName
.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
To query the local KeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key);
Note that GlobalKTable
always applies "auto.offset.reset"
strategy "earliest"
regardless of the specified value in StreamsConfig
.topic
- the topic name; cannot be null
queryableStoreName
- the state store name; if null
an internal store name will be automatically givenGlobalKTable
for the specified topicpublic <K,V> GlobalKTable<K,V> globalTable(String topic)
GlobalKTable
for the specified topic.
The default key and value deserializers as specified in the config
are used.
Input records
with null
key will be dropped.
The resulting GlobalKTable
will be materialized in a local KeyValueStore
with an internal
store name. Note that that store name may not be queriable through Interactive Queries.
No internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
Note that GlobalKTable
always applies "auto.offset.reset"
strategy "earliest"
regardless of the specified value in StreamsConfig
.
topic
- the topic name; cannot be null
GlobalKTable
for the specified topicpublic <K,V> GlobalKTable<K,V> globalTable(Serde<K> keySerde, Serde<V> valueSerde, TimestampExtractor timestampExtractor, String topic, String queryableStoreName)
GlobalKTable
for the specified topic.
The default TimestampExtractor
and default key and value deserializers as specified in
the config
are used.
Input KeyValue
pairs with null
key will be dropped.
The resulting GlobalKTable
will be materialized in a local KeyValueStore
with the given
queryableStoreName
.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
To query the local KeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key);
Note that GlobalKTable
always applies "auto.offset.reset"
strategy "earliest"
regardless of the specified value in StreamsConfig
.keySerde
- key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalueSerde
- value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtimestampExtractor
- the stateless timestamp extractor used for this source KTable
,
if not specified the default extractor defined in the configs will be usedtopic
- the topic name; cannot be null
queryableStoreName
- the state store name; if null
an internal store name will be automatically givenGlobalKTable
for the specified topicpublic <K,V> GlobalKTable<K,V> globalTable(Serde<K> keySerde, Serde<V> valueSerde, String topic, StateStoreSupplier<KeyValueStore> storeSupplier)
GlobalKTable
for the specified topic.
The default TimestampExtractor
as specified in the config
is used.
Input KeyValue
pairs with null
key will be dropped.
The resulting GlobalKTable
will be materialized in a local KeyValueStore
with the given
queryableStoreName
.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
To query the local KeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key);
Note that GlobalKTable
always applies "auto.offset.reset"
strategy "earliest"
regardless of the specified value in StreamsConfig
.keySerde
- key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalueSerde
- value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic
- the topic name; cannot be null
storeSupplier
- user defined state store supplier; Cannot be null
GlobalKTable
for the specified topicpublic <K,V> GlobalKTable<K,V> globalTable(Serde<K> keySerde, Serde<V> valueSerde, String topic, String queryableStoreName)
GlobalKTable
for the specified topic.
The default TimestampExtractor
as specified in the config
is used.
Input KeyValue
pairs with null
key will be dropped.
The resulting GlobalKTable
will be materialized in a local KeyValueStore
with the given
queryableStoreName
.
However, no internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
To query the local KeyValueStore
it must be obtained via
KafkaStreams#store(...)
:
KafkaStreams streams = ...
ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
String key = "some-key";
Long valueForKey = localStore.get(key);
Note that GlobalKTable
always applies "auto.offset.reset"
strategy "earliest"
regardless of the specified value in StreamsConfig
.keySerde
- key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalueSerde
- value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic
- the topic name; cannot be null
queryableStoreName
- the state store name; if null
an internal store name will be automatically givenGlobalKTable
for the specified topicpublic <K,V> GlobalKTable<K,V> globalTable(Serde<K> keySerde, Serde<V> valueSerde, String topic)
GlobalKTable
for the specified topic.
The default key and value deserializers as specified in the config
are used.
Input records
with null
key will be dropped.
The resulting GlobalKTable
will be materialized in a local KeyValueStore
with an internal
store name. Note that that store name may not be queriable through Interactive Queries.
No internal changelog topic is created since the original input topic can be used for recovery (cf.
methods of KGroupedStream
and KGroupedTable
that return a KTable
).
Note that GlobalKTable
always applies "auto.offset.reset"
strategy "earliest"
regardless of the specified value in StreamsConfig
.
keySerde
- key serde used to send key-value pairs,
if not specified the default key serde defined in the configuration will be usedvalueSerde
- value serde used to send key-value pairs,
if not specified the default value serde defined in the configuration will be usedtopic
- the topic name; cannot be null
GlobalKTable
for the specified topicpublic StreamsBuilder addStateStore(StateStoreSupplier supplier, String... processorNames)
Topology
.supplier
- the supplier used to obtain this state store StateStore
instanceprocessorNames
- the names of the processors that should be able to access the provided storeTopologyException
- if state store supplier is already addedpublic StreamsBuilder addGlobalStore(StateStoreSupplier<KeyValueStore> storeSupplier, String sourceName, Deserializer keyDeserializer, Deserializer valueDeserializer, String topic, String processorName, ProcessorSupplier stateUpdateSupplier)
StateStore
to the topology.
The StateStore
sources its data from all partitions of the provided input topic.
There will be exactly one instance of this StateStore
per Kafka Streams instance.
A SourceNode
with the provided sourceName will be added to consume the data arriving from the partitions
of the input topic.
The provided ProcessorSupplier
will be used to create an ProcessorNode
that will receive all
records forwarded from the SourceNode
.
This ProcessorNode
should be used to keep the StateStore
up-to-date.
The default TimestampExtractor
as specified in the config
is used.
storeSupplier
- user defined state store suppliersourceName
- name of the SourceNode
that will be automatically addedkeyDeserializer
- the Deserializer
to deserialize keys withvalueDeserializer
- the Deserializer
to deserialize values withtopic
- the topic to source the data fromprocessorName
- the name of the ProcessorSupplier
stateUpdateSupplier
- the instance of ProcessorSupplier
TopologyException
- if the processor of state is already registeredpublic StreamsBuilder addGlobalStore(StateStoreSupplier<KeyValueStore> storeSupplier, String sourceName, TimestampExtractor timestampExtractor, Deserializer keyDeserializer, Deserializer valueDeserializer, String topic, String processorName, ProcessorSupplier stateUpdateSupplier)
StateStore
to the topology.
The StateStore
sources its data from all partitions of the provided input topic.
There will be exactly one instance of this StateStore
per Kafka Streams instance.
A SourceNode
with the provided sourceName will be added to consume the data arriving from the partitions
of the input topic.
The provided ProcessorSupplier
will be used to create an ProcessorNode
that will receive all
records forwarded from the SourceNode
.
This ProcessorNode
should be used to keep the StateStore
up-to-date.
storeSupplier
- user defined state store suppliersourceName
- name of the SourceNode
that will be automatically addedtimestampExtractor
- the stateless timestamp extractor used for this source,
if not specified the default extractor defined in the configs will be usedkeyDeserializer
- the Deserializer
to deserialize keys withvalueDeserializer
- the Deserializer
to deserialize values withtopic
- the topic to source the data fromprocessorName
- the name of the ProcessorSupplier
stateUpdateSupplier
- the instance of ProcessorSupplier
TopologyException
- if the processor of state is already registered