Opentelemetry provides vendor neutral standards and implementations to generate, collect, and export tracing data.

A Context is a propagation mechanism which carries execution-scoped values across API boundaries and between logically associated execution units. Cross-cutting concerns access their data in-process using the same shared Context object.

Context propagation is required when the tracing needs to cross process or service boundaries. Common ways to propagate the context is using W3C Trace Context or Zipkin B3 headers, while to inject the context is, for example, http headers or metadata fields in event messages.

Sometimes there is no clear way to propagate the context, for example, when using Log-Based Change Data Capture:

url

There is no simple mechanism to propagate “Application A” context to “Application B” without, for example, polluting records with ephemeral data or using additional datastore.

There are some solutions like sqlcommenter that rely on log analysis, but don’t address the propagation.

Inside the WAL

In PostgreSQL 9.6 pg_logical_emit_message was introduced, which can be used to pass generic messages to logical decoding plugins through WAL.

pgoutput supports logical decoding messages since version 14, when the flag messages is set to true.

Setting pg_logical_emit_message transactional flag to true it is possible to aggregate the message with transaction.

As an example, defining the following helper function:

const propagationQuery = `select pg_logical_emit_message(true,'otelwal',$1);`

type stringMap map[string]string

func Inject(ctx context.Context, tx *sql.Tx, p propagation.TextMapPropagator) error {
	m := make(stringMap)
	p.Inject(ctx, m)

	rows, err := tx.QueryContext(ctx, propagationQuery, m)
	if err != nil {
		return err
	}

	return rows.Close()
}

With helper function a user can manually propagate the context:

tx, err := db.BeginTx(ctx, nil)
if err != nil { ... }

err = otelwal.Inject(ctx, tx, propagation.TraceContext{})
if err != nil { ... }

_, err = tx.Exec(`insert into foo (id, val) values ($1,$2)`, 1, 2)
if err != nil { ... }

err = tx.Commit()
if err != nil { ... }

Checking emitted changes using wal2json:

{
      "change": [
            {
                  "kind": "message",
                  "transactional": true,
                  "prefix": "otelwal",
                  "content": "{\"traceparent\":\"00-8a9e294a6655a610899ec9f09dfff0d0-07c8d67a7650d803-01\"}"
            },
            {
                  "kind": "insert",
                  "schema": "public",
                  "table": "foo",
                  "columnnames": ["id", "val"],
                  "columntypes": ["integer", "integer"],
                  "columnvalues": [1, 2]
            }
      ]
}

With this information, the cdc library can pick the context and propagate accordingly.

Conclusion

Using pg_logical_emit_message with conjunction with transactions allows to propagate opentelemetry tracing information across the database layer without requiring extra machinery at the expense of extra transactions and bytes across the wire.