jepsen.tests.kafka
This workload is intended for systems which behave like the popular Kafka queue. This includes Kafka itself, as well as compatible systems like Redpanda.
At the abstract level of this workload, these systems provide a set of totally-ordered append-only logs called partitions, each of which stores a single arbitrary (and, for our purposes, unique) message at a particular offset into the log. Partitions are grouped together into topics: each topic is therefore partially ordered.
Each client has a producer and a consumer aspect; in Kafka these are separate clients, but for Jepsen’s purposes we combine them. A producer can send a message to a topic-partition, which assigns it a unique, theoretically monotonically-increasing offset and saves it durably at that offset. A consumer can subscribe to a topic, in which case the system aautomatically assigns it any number of partitions in that topic–this assignment can change at any time. Consumers can also assign themselves specific partitions manually. When a consumer polls, it receives messages and their offsets from whatever topic-partitions it is currently assigned to, and advances its internal state so that the next poll (barring a change in assignment) receives the immediately following messages.
Operations
To subscribe to a new set of topics, we issue an operation like:
{:f :subscribe, :value k1, k2, …}
or
{:f :assign, :value k1, k2, …}
… where k1, k2, etc denote specific partitions. For subscribe, we convert those partitions to the topics which contain them, and subscribe to those topics; the database then controls which specific partitions we get. Just like the Kafka client API, both subscribe and assign replace the current topics for the consumer.
Assign ops can also have a special key :seek-to-beginning? true
which indicates that the client should seek to the beginning of all its partitions.
Reads and writes (and mixes thereof) are encoded as a vector of micro-operations:
{:f :poll, :value op1, op2, …} {:f :send, :value op1, op2, …} {:f :txn, :value op1, op2, …}
Where :poll and :send denote transactions comprising only reads or writes, respectively, and :txn indicates a general-purpose transaction. Operations are of two forms:
… instructs a client to append value
to the integer key
–which maps uniquely to a single topic and partition. These operations are returned as:
[:send key offset value]
where offset is the returned offset of the write, if available, or nil
if it is unknown (e.g. if the write times out).
Reads are invoked as:
… which directs the client to perform a single poll
operation on its consumer. The results of that poll are expanded to:
[:poll {key1 [offset1 value1 offset2 value2 …], key2 …}]
Where key1, key2, etc are integer keys obtained from the topic-partitions returned by the call to poll, and the value for that key is a vector of offset value pairs, corresponding to the offset of that message in that particular topic-partition, and the value of the message—presumably, whatever was written by [:send key value]
earlier.
When polling without using assign, clients should call .commitSync
before returning a completion operation.
Before a transaction completes, we commit its offsets.
All transactions may return an optional key :rebalance-log, which is a vector of rebalancing events (changes in assigned partitions) that occurred during the execution of that transaction. Each rebalance event is a map like:
{:keys k1 k2 …}
There may be more keys in this map; I can’t remember right now.
Topic-partition Mapping
We identify topics and partitions using abstract integer keys, rather than explicit topics and partitions. The client is responsible for mapping these keys bijectively to topics and partitions.
Analysis
From this history we can perform a number of analyses:
-
For any observed value of a key, we check to make sure that its writer was either :ok or :info; if the writer :failed, we know this constitutes an aborted read.
-
We verify that all sends and polls agree on the value for a given key and offset. We do not require contiguity in offsets, because transactions add invisible messages which take up an offset slot but are not visible to the API. If we find divergence, we know that Kakfa disagreed about the value at some offset.
Having verified that each key offset pair uniquely identifies a single value, we eliminate the offsets altogether and perform the remainder of the analysis purely in terms of keys and values. We construct a graph where vertices are values, and an edge v1 -> v2 means that v1 immediately precedes v2 in the offset order (ignoring gaps in the offsets, which we assume are due to transaction metadata messages).
-
For each key, we take the highest observed offset, and then check that every :ok :send operation with an equal or lower offset was also read by at least one consumer. If we find one, we know a write was lost!
-
We build a dependency graph between pairs of transactions T1 and T2, where T1 != T2, like so:
ww. T1 sent value v1 to key k, and T2 sent v2 to k, and o1 < o2 in the version order for k.
wr. T1 sent v1 to k, and T2’s highest read of k was v1.
rw. T1’s highest read of key k was offset o1, and T2 sent offset o2 to k, and o1 < o2 in the version order for k.
Our use of “highest offset” is intended to capture the fact that each poll operation observes a range of offsets, but in general those offsets could have been generated by many transactions. If we drew wr edges for every offset polled, we’d generate superfluous edges–all writers are already related via ww dependencies, so the final wr edge, plus those ww edges, captures those earlier read values.
We draw rw edges only for the final versions of each key observed by a transaction. If we drew rw edges for an earlier version, we would incorrectly be asserting that later transactions were not observed!
We perform cycle detection and categorization of anomalies from this graph using Elle.
-
Internal Read Contiguity: Within a transaction, each pair of reads on the same key should be directly related in the version order. If we observe a gap (e.g. v1 < … < v2) that indicates this transaction skipped over some values. If we observe an inversion (e.g. v2 < v1, or v2 < … < v1) then we know that the transaction observed an order which disagreed with the “true” order of the log.
-
Internal Write Contiguity: Gaps between sequential pairs of writes to the same key are detected via Elle as write cycles. Inversions are not, so we check for them explicitly: a transaction sends v1, then v2, but v2 < v1 or v2 < … v1 in the version order.
-
Intermediate reads? I assume these happen constantly, but are they supposed to? It’s not totally clear what this MEANS, but I think it might look like a transaction T1 which writes v1 v2 v3 to k, and another T2 which polls k and observes any of v1, v2, or v3, but not all of them. This miiight be captured as a wr-rw cycle in some cases, but perhaps not all, since we’re only generating rw edges for final reads.
-
Precommitted reads. These occur when a transaction observes a value that it wrote. This is fine in most transaction systems, but illegal in Kafka, which assumes that consumers (running at read committed) never observe uncommitted records.
allowed-error-types
(allowed-error-types test)
Redpanda does a lot of things that are interesting to know about, but not necessarily bad or against-spec. For instance, g0 cycles are normal in the Kafka transactional model, and g1c is normal with wr-only edges at read-uncommitted but not with read-committed. This is a very ad-hoc attempt to encode that so that Jepsen’s valid/invalid results are somewhat meaningful.
Takes a test, and returns a set of keyword error types (e.g. :poll-skip) which this test considers allowable.
analysis
(analysis history)
(analysis history opts)
Builds up intermediate data structures used to understand a history. Options include:
:directory - Used for generating output files :ww-deps - Whether to perform write-write inference on the basis of log offsets.
around-key-offset
(around-key-offset k offset history)
(around-key-offset k offset n history)
Filters a history to just those operations around a given key and offset; trimming their mops to just those regions as well.
around-key-value
(around-key-value k value history)
(around-key-value k value n history)
Filters a history to just those operations around a given key and value; trimming their mops to just those regions as well.
around-some
(around-some pred n coll)
Clips a sequence to just those elements near a predicate. Takes a predicate, a range n, and a sequence xs. Returns the series of all x in xs such x is within n elements of some x’ matching predicate.
assocv
(assocv v i value)
An assoc on vectors which allows you to assoc at arbitrary indexes, growing the vector as needed. When v is nil, constructs a fresh vector rather than a map.
condense-error
(condense-error test [type errs])
Takes a test and a pair of an error type (e.g. :lost-write) and a seq of errors. Returns a pair of [type, {:count n, :errors …}], which tries to show the most interesting or severe errors without making the pretty-printer dump out two gigabytes of EDN.
consume-counts
(consume-counts {:keys [history op-reads]})
Kafka transactions are supposed to offer ‘exactly once’ processing: a transaction using the subscribe workflow should be able to consume an offset and send something to an output queue, and if this transaction is successful, it should happen at most once. It’s not exactly clear to me how these semantics are supposed to work–it’s clearly not once per consumer group, because we routinely see dups with only one consumer group. As a fallback, we look for single consumer per process, which should DEFINITELY hold, but… appears not to.
We verify this property by looking at all committed transactions which performed a poll while subscribed (not assigned!) and keeping track of the number of times each key and value is polled. Yields a map of keys to values to consumed counts, wherever that count is more than one.
crash-client-gen
(crash-client-gen opts)
A generator which, if the test has :crash-clients? true, periodically emits an operation to crash a random client.
cycles!
(cycles! {:keys [history directory], :as analysis})
Finds a map of cycle names to cyclic anomalies in a partial analysis.
datafy-version-order-log
(datafy-version-order-log m)
Turns a bifurcan integer map of Bifurcan sets, and converts it to a vector of Clojure sets.
downsample-plot
(downsample-plot points)
Sometimes we wind up feeding absolutely huge plots to gnuplot, which chews up a lot of CPU time. We downsample these points, skipping points which are close in both x and y.
duplicate-cases
(duplicate-cases {:keys [version-orders]})
Takes a partial analysis and identifies cases where a single value appears at more than one offset in a key.
final-polls
(final-polls offsets)
Takes an atom containing a map of keys to offsets. Constructs a generator which:
-
Checks the topic-partition state from the admin API
-
Crashes the client, to force a fresh one to be opened, just in case there’s broken state inside the client.
-
Assigns the new client to poll every key, and seeks to the beginning
-
Polls repeatedly
This process repeats every 10 seconds until polls have caught up to the offsets in the offsets atom.
g1a-cases
(g1a-cases {:keys [history writes-by-type writer-of op-reads]})
Takes a partial analysis and looks for aborted reads, where a known-failed write is nonetheless visible to a committed read. Returns a seq of error maps, or nil if none are found.
graph
(graph analysis history)
A combined Elle dependency graph between completion operations.
index-seq
(index-seq xs)
Takes a seq of distinct values, and returns a map of:
{:by-index A vector of the sequence :by-value A map of values to their indices in the vector.}
int-poll-skip+nonmonotonic-cases
(int-poll-skip+nonmonotonic-cases {:keys [history version-orders op-reads]})
Takes a partial analysis and looks for cases where a single transaction contains:
{:skip A pair of poll values which read the same key and skip over some part of the log which we know should exist. :nonmonotonic A pair of poll values which contradict the log order, or repeat the same value.}
When a transaction’s rebalance log includes a key which would otherwise be involved in one of these violations, we don’t report it as an error: we assume that rebalances invalidate any assumption of monotonically advancing offsets.
int-poll-skip+nonmonotonic-cases-per-key
(int-poll-skip+nonmonotonic-cases-per-key version-orders op rebalanced-keys errs [k vs])
A reducer for int-poll-skip+nonmonotonic-cases. Takes version orders, an op, a rebalanced-keys set, a transient vector of error maps and a key, values pair from (op-reads). Adds an error if we can find one in some key.
int-send-skip+nonmonotonic-cases
(int-send-skip+nonmonotonic-cases {:keys [history version-orders]})
Takes a partial analysis and looks for cases where a single transaction contains a pair of sends to the same key which:
{:skip Skips over some indexes of the log :nonmonotonic Go backwards (or stay in the same place) in the log}
interleave-subscribes
(interleave-subscribes opts txn-gen)
Takes CLI options (:sub-p) and a txn generator. Keeps track of the keys flowing through it, interspersing occasional :subscribe or :assign operations for recently seen keys.
key-order-viz
(key-order-viz k log history)
Takes a key, a log for that key (a vector of offsets to sets of elements which were observed at that offset) and a history of ops relevant to that key. Constructs an XML structure visualizing all sends/polls of that log’s offsets.
log->last-index->values
(log->last-index->values log)
Takes a log: a vector of sets of read values for each offset in a partition, possibly including nil
s. Returns a vector which takes indices (dense offsets) to sets of values whose last appearance was at that position.
log->value->first-index
(log->value->first-index log)
Takes a log: a vector of sets of read values for each offset in a partition, possibly including nil
s. Returns a map which takes a value to the index where it first appeared.
lost-write-cases
(lost-write-cases {:keys [history version-orders reads-by-type writer-of readers-of]})
Takes a partial analysis and looks for cases of lost write: where a write that we should have observed is somehow not observed. Of course we cannot expect to observe everything: for example, if we send a message to Redpanda at the end of a test, and don’t poll for it, there’s no chance of us seeing it at all! Or a poller could fall behind.
What we do instead is identify the highest read value for each key v_max, and then take the set of all values prior to it in the version order: surely, if we read v_max = 3, and the version order is 1 2 3 4, we should also have read 1 and 2.
It’s not quite this simple. If a message appears at multiple offsets, the version order will simply pick one for us, which leads to nondeterminism. If an offset has multiple messages, a successfully inserted message could appear nowhere in the version order.
To deal with this, we examine the raw logs for each key, and build two index structures. The first maps values to their earliest (index) appearance in the log: we use this to determine the highest index that must have been read. The second is a vector which maps indexes to sets of values whose last appearance in the log was at that index. We use this vector to identify which values ought to have been read.
Once we’ve derived the set of values we ought to have read for some key k, we run through each poll of k and cross off the values read. If there are any values left, they must be lost updates.
mop-index
(mop-index op f k v)
Takes an operation, a function f (:poll or :send), a key k, and a value v. Returns the index (0, 1, …) within that operation’s value which performed that poll or send, or nil if none could be found.
must-have-committed?
(must-have-committed? reads-by-type op)
Takes a reads-by-type map and a (presumably :info) transaction which sent something. Returns true iff the transaction was :ok, or if it was :info and we can prove that some send from this transaction was read.
nonmonotonic-send-cases
(nonmonotonic-send-cases {:keys [history by-process version-orders]})
Takes a partial analysis and checks each process’s operations sequentially, looking for cases where a single process’s sends to a given key go backwards relative to the version order.
op->max-offsets
(op->max-offsets op)
Takes an operation (presumably, an OK or info one) and returns a map of keys to the highest offsets interacted with, either via send or poll, in that op.
op->max-poll-offsets
(op->max-poll-offsets {:keys [type f value]})
Takes an operation and returns a map of keys to the highest offsets polled.
op->max-send-offsets
(op->max-send-offsets {:keys [type f value]})
Takes an operation and returns a map of keys to the highest offsets sent.
op-around-key-offset
(op-around-key-offset k offset op)
(op-around-key-offset k offset n op)
Takes an operation and returns that operation with its value trimmed so that any send/poll operations are constrained to just the given key, and values within n of the given offset. Returns nil if operation is not relevant.
op-around-key-value
(op-around-key-value k value op)
(op-around-key-value k value n op)
Takes an operation and returns that operation with its value trimmed so that any send/poll operations are constrained to just the given key, and values within n of the given value. Returns nil if operation is not relevant.
op-pairs
(op-pairs op)
Returns a map of keys to the sequence of all offset value pairs either written or read for that key; writes first.
op-read-offsets
(op-read-offsets op)
Returns a map of keys to the sequence of all offsets read for that key.
op-read-pairs
(op-read-pairs op)
Returns a map of keys to the sequence of all offset value pairs read for that key.
op-reads
(op-reads op)
Returns a map of keys to the sequence of all values read for that key.
op-reads-helper
(op-reads-helper op f)
Takes an operation and a function which takes an offset-value pair. Returns a map of keys read by this operation to the sequence of (f offset value) read for that key.
op-reads-index
(op-reads-index history)
We call op-reads a LOT. This takes a history and builds an efficient index, then returns a function (op-reads op) which works just like (op-reads op), but is memoized.
op-write-offsets
(op-write-offsets op)
Returns a map of keys to the sequence of all offsets written to that key in an op.
op-write-pairs
(op-write-pairs op)
Returns a map of keys to the sequence of all offset value pairs written to that key in an op.
op-writes
(op-writes op)
Returns a map of keys to the sequence of all values written to that key in an op.
op-writes-helper
(op-writes-helper op f)
Takes an operation and a function which takes an offset-value pair. Returns a map of keys written by this operation to the sequence of (f offset value) sends for that key. Note that offset may be nil.
plot-bounds
(plot-bounds points)
Quickly determine {:min-x, :max-x, :min-y, :max-y} from a series of x y points. Nil if there are no points.
plot-realtime-lag!
(plot-realtime-lag! test lags {:keys [nemeses subdirectory filename group-fn group-name]})
Takes a test, a collection of realtime lag measurements, and options (e.g. those to checker/check). Plots a graph file (realtime-lag.png) in the store directory
plot-realtime-lags!
(plot-realtime-lags! {:keys [history], :as test} lags opts)
Constructs realtime lag plots for all processes together, and then another broken out by process, and also by key.
plot-unseen!
(plot-unseen! test unseen {:keys [subdirectory]})
Takes a test, a collection of unseen measurements, and options (e.g. those to checker/check). Plots a graph file (unseen.png) in the store directory.
poll-skip+nonmonotonic-cases
(poll-skip+nonmonotonic-cases {:keys [history by-process version-orders op-reads]})
Takes a partial analysis and checks each process’s operations sequentially, looking for cases where a single process either jumped backwards or skipped over some region of a topic-partition. Returns a task of a map:
{:nonmonotonic Cases where a process started polling at or before a previous operation last left off :skip Cases where two successive operations by a single process skipped over one or more values for some key.}
poll-skip+nonmonotonic-cases-per-process
(poll-skip+nonmonotonic-cases-per-process version-orders op-reads ops)
Per-process helper for poll-skip+nonmonotonic cases.
poll-unseen
(poll-unseen gen)
Wraps a generator. Keeps track of every offset that is successfully sent, and every offset that’s successfully polled. When there’s a key that has some offsets which were sent but not polled, we consider that unseen. This generator occasionally rewrites assign/subscribe operations to try and catch up to unseen keys.
precommitted-read-cases
(precommitted-read-cases {:keys [history op-reads]})
Takes a partial analysis with a history and looks for a transaction which observed its own writes. Returns a vector of error maps, or nil if none are found.
This is legal in most DBs, but in Kafka’s model, sent values are supposed to be invisible to all pollers until their producing txn commits.
previous-value
(previous-value version-order v2)
Takes a version order for a key and a value. Returns the previous value in the version order, or nil if either we don’t know v2’s index or v2 was the first value in the version order.
readers-of
(readers-of history op-reads)
Takes a history and an op-reads fn, and builds a map of keys to values to vectors of completion operations which observed those that value.
reads-by-type
(reads-by-type history op-reads)
Takes a history and an op-reads fn, and constructs a map of types (:ok, :info, :fail) to maps of keys to the set of all values which were read for that key. We use this to identify, for instance, the known-successful reads for some key as a part of finding lost updates.
reads-of-key
(reads-of-key k history)
(reads-of-key k v history)
Returns a seq of all operations which read the given key, and, optionally, read the given value.
reads-of-key-offset
(reads-of-key-offset k offset history)
Returns a seq of all operations which read the given key and offset.
reads-of-key-value
(reads-of-key-value k value history)
Returns a seq of all operations which read the given key and value.
realtime-lag
(realtime-lag history)
Takes a history and yields a series of maps of the form
{:process The process performing a poll :key The key being polled :time The time the read began, in nanos :lag The realtime lag of this key, in nanos.
The lag of a key k in a poll is the conservative estimate of how long it has been since the highest value in that poll was the final message in log k.
For instance, given:
{:time 1, :type :ok, :value [:send :x 0 :a]} {:time 2, :type :ok, :value [:poll {:x 0 :a}]}
The lag of this poll is zero, since we observed the most recent completed write to x. However, if we:
{:time 3, :type :ok, :value [:send :x 1 :b]} {:time 4, :type :invoke, :value :poll} {:time 5, :type :ok, :value [:poll {:x []}]}
The lag of this read is 4 - 3 = 1. By time 3, offset 1 must have existed for key x. However, the most recent offset we observed was 0, which could only have been the most recent offset up until the write of offset 1 at time 3. Since our read could have occurred as early as time 4, the lag is at least 1.
Might want to make this into actual lower upper ranges, rather than just the lower bound on lag, but conservative feels OK for starters.
render-order-viz!
(render-order-viz! test {:keys [version-orders errors history], :as analysis})
Takes a test, an analysis, and for each key with certain errors renders an HTML timeline of how each operation perceived that key’s log.
stats-checker
(stats-checker)
(stats-checker c)
Wraps a (jepsen.checker/stats) with a new checker that returns the same results, except it won’t return :valid? false if :crash or :debug-topic-partitions ops always crash. You might want to wrap your existing stats checker with this.
strip-types
(strip-types ms)
Takes a collection of maps, and removes their :type fields. Returns nil if none remain.
tag-rw
(tag-rw gen)
Takes a generator and tags operations as :f :poll or :send if they’re entirely comprised of send/polls.
track-key-offsets
(track-key-offsets keys-atom gen)
Wraps a generator. Keeps track of every key that generator touches in the given atom, which is a map of keys to highest offsets seen.
txn-generator
(txn-generator la-gen)
unseen
(unseen {:keys [history op-reads]})
Takes a partial analysis and yields a series of maps like
{:time The time in nanoseconds :unseen A map of keys to the number of messages in that key which have been successfully acknowledged, but not polled by any client.}
The final map in the series includes a :messages key: a map of keys to sets of messages that were unseen.
version-orders
(version-orders history reads-by-type)
Takes a history and a reads-by-type structure. Constructs a map of:
{:orders A map of keys to orders for that key. Each order is a map of: {:by-index A vector which maps indices to single values, in log order. :by-value A map of values to indices in the log. :log A vector which maps offsets to sets of values in log order.}
:errors A series of error maps describing any incompatible orders, where a single offset for a key maps to multiple values.}
Offsets are directly from Kafka. Indices are dense offsets, removing gaps in the log.
Note that we infer version orders from sends only when we can prove their effects were visible, but from all polls, including :info and :fail ones. Why? Because unlike a traditional transaction, where you shouldn’t trust reads in aborted txns, pollers in Kafka’s transaction design are always supposed to emit safe data regardless of whether the transaction commits or not.
version-orders-reduce-mop
(version-orders-reduce-mop logs mop)
Takes a logs object from version-orders and a micro-op, and integrates that micro-op’s information about offsets into the logs.
version-orders-update-log
(version-orders-update-log log offset value)
Updates a version orders log with the given offset and value.
workload
(workload opts)
Constructs a workload (a map with a generator, client, checker, etc) given an options map. Options are:
:crash-clients? If set, periodically emits a :crash operation which the client responds to with :info; this forces the client to be torn down and replaced by a fresh client.
:crash-client-interval How often, in seconds, to crash clients. Default is 30 seconds.
:sub-via A set of subscription methods: either #{:assign} or #{:subscribe}.
:txn? If set, generates transactions with multiple send/poll micro-operations.
:sub-p The probability that the generator emits an assign/subscribe op.
These options must also be present in the test map, because they are used by the checker, client, etc at various points. For your convenience, they are included in the workload map returned from this function; merging that map into your test should do the trick.
… plus those taken by jepsen.tests.cycle.append/test, e.g. :key-count, :min-txn-length, …
worst-realtime-lag
(worst-realtime-lag lags)
Takes a seq of realtime lag measurements, and finds the point with the highest lag.
wr-graph
(wr-graph {:keys [writer-of readers-of op-reads]} history)
Analyzes a history to extract write-read dependencies. T1 < T2 iff T1 writes some v to k and T2 reads k.
writer-of
(writer-of history)
Takes a history and builds a map of keys to values to the completion operation which attempted to write that value.
writes-by-type
(writes-by-type history)
Takes a history and constructs a map of types (:ok, :info, :fail) to maps of keys to the set of all values which were written for that key. We use this to identify, for instance, what all the known-failed writes were for a given key.
writes-of-key
(writes-of-key k history)
(writes-of-key k v history)
Returns a seq of all operations which wrote the given key, and, optionally, sent the given value.
writes-of-key-offset
(writes-of-key-offset k offset history)
Returns a seq of all operations which wrote the given key and offset.
writes-of-key-value
(writes-of-key-value k value history)
Returns a seq of all operations which wrote the given key and value.
ww-graph
(ww-graph {:keys [writer-of version-orders ww-deps]} history)
Analyzes a history to extract write-write dependencies. T1 < T2 iff T1 sends some v1 to k and T2 sends some v2 to k and v1 < v2 in the version order.