Lately I’ve been writing some Chef code. One of the best things about Chef
is custom resources:
https://docs.chef.io/custom_resources.html

Let’s see an example on how to create a Kafka topic
using Chef and how to make it
idempotent.

Before writing any Chef code it is important to understand how to manage a
topic (grouping of messages of a similar type).

From the Kafka install directory, first check if the topic already exists:

bin/kafka-topics.sh
    --zookeeper localhost:2181 \
    --describe \
    --topic <name>

If it doesn’t exist, create it:

bin/kafka-topics.sh
    --zookeeper localhost:2181 \
    --create \
    --topic <name> \
    --replication-factor <value> \
    --partitions <value>

If the topic exists, to increase the number of partitions (way to
parallelize the consumption of messages from a topic):

bin/kafka-topics.sh
    --zookeeper localhost:2181 \
    --alter \
    --topic <name> \
    --partitions <value>

If the topic exists, to increase the number of *replicas *(number of copies of
the partitions for data redundancy):

bin/kafka-reassign-partitions.sh \
    --zookeeper localhost:2181 \
    --reassignment-json-file increase-replication-factor.json \
    --execute

Where the file increase-replication-factor.json follows the format:

{
    "version": 1,
    "partitions": [
        {
        "topic":"foo",
        "partition": 0,
        "replicas":[1, 2, 3]
        }
    ]
}

The basic custom resource structure:


property :topic,      String, name_property: true
property :partitions, Fixnum, default: 3
property :replicas,   Fixnum, default: 3
property :zookeeper,  String, default: "localhost:2181"
property :directory,  String, default: "/kafka"

action :create do
  cmd = %W(
    #{directory}/bin/kafka-topics.sh
    --zookeeper #{zookeeper}
    --create
    --topic #{topic}
    --replication-factor #{replicas}
    --partitions #{partitions}
  ).join(" ")

  result = shell_out(cmd)
end

Using the resource (cookbook example):

example_topic "example" do
  partitions  1
  replication 1
end

With this code creating topics is possible. However, it isn’t checking if the
topic already exists or if the settings are valid.


ChefSpec

ChefSpec is a testing framework aimed
for testing Chef cookbooks.

In order to test a custom resource, first create the ChefSpec resource
matcher:

if defined?(ChefSpec)

  def create_example_topic(resource_name)
    ChefSpec::Matchers::ResourceMatcher.new(:example_topic, :create, resource_name)
  end

end

Generate a test_cookbook:

mkdir -p test/cookbooks
chef generate cookbook test/cookbooks/test_example

Create the file* test/cookbooks/test_example/recipes/create.rb*:

example_topic "example" do
  partitions  1
  replication 1
end

Update the file test/cookbooks/test_example/metadata.rb:

name "test_example"
maintainer "The Authors"
maintainer_email "you@example.com"
license "all_rights"
description "Installs/Configures test"
long_description "Installs/Configures test"
version "0.1.0"

depends "example" # example cookbook dependency

Write the create test spec/unit/recipes/create_spec.rb:

require "spec_helper"

context "test_example" do
  let(:chef_conf) do
    ChefSpec::SoloRunner.new cookbook_path: %w(./test/cookbooks ../), # import test and example cookbook
                             step_into:     %w(example_topic) # dive inside example_topic
  end

  let(:shellout) { double("shellout") } # setup shellout double

  before do
    allow(Mixlib::ShellOut).to receive(:new).and_return(shellout)
    allow(shellout).to receive_messages(
      :run_command => nil,
      :error! => nil,
      :live_stream => nil,
      :live_stream= => nil
    )
  end

  describe "topic create" do
    let(:chef_run) { chef_conf.converge("test_example::create") }

    it "converges successfully" do
      allow(shellout).to receive(:stdout).and_return("", "Created topic")

      expect { chef_run }.to_not raise_error
      expect(chef_run).to create_example_topic("test") # custom matcher in action
    end
  end

Now we can improve our resource with confidence — and all will still be working!


At the moment the resource is quite simple and will try to create the topic
every time Chef runs. Now it’s time to find ways to improve it!

As seen before we can search for a topic:

bin/kafka-topics.sh
    --zookeeper $ZK \
    --describe \
    --topic <name>

The output will be something like:


kafka describe topic

Example code to parse this output:

def list(directory, zookeeper, topic)
  cmd = %W(
    #{directory}/bin/kafka-topics.sh
    --zookeeper #{zookeeper}
    --describe
    --topic #{topic}
  ).join(" ")

  result = shell_out(cmd).stdout
  extract = ->(a) { a && a.to_i }

  {
    partitions: extract.call(result[/PartitionCount:\s*(\d+)/, 1]),
    replicas: extract.call(result[/ReplicationFactor:\s*(\d+)/, 1])
  }
end

With this we create a consistent view between Chef and the actual resource:

load_current_value do
  current_partitions, current_replicas = list(directory, zookeeper, topic)
                                         .values_at *%w(partitions replicas)

  current_value_does_not_exist! unless current_partitions && current_replicas

  partitions current_partitions
  replicas current_replicas
end

Refactoring the “create action”:

action :create do
  # calls the block if current_resource does not exist
  # or values are different from current_resource
  converge_if_changed do
    if current_resource
      action_update
    else
      converge_by "Create topic #{new_resource.topic}" do
        create! new_resource.directory,
                new_resource.zookeeper,
                new_resource.topic,
                new_resource.partitions,
                new_resource.replicas
      end
    end
  end
end

Update action”:

action :update do
  alter! new_resource.directory,
         new_resource.zookeeper,
         new_resource.topic,
         new_resource.partitions
end

require "chef/log"

def alter!(directory, zookeeper, topic, partitions)
  cmd = %W(
    #{directory}/bin/kafka-topics.sh
    --zookeeper #{zookeeper}
    --alter
    --topic #{topic}
    --partitions #{partitions}
  ).join(" ")

  result = shell_out(cmd)
  Chef::Log.error(result.stderr) unless result.stdout =~ /Adding partitions succeeded!/
end

This small example goes to show just how powerful the Chef DSL is and how it
helps in the creation of idempotent resources.

The full example (with tests) is available in the github:
link