Search code examples
apache-kafkaconfluent-schema-registrynomad

Kafka Schema registry - Advertise dynamic port in container


As I understand, while using Schema registry confluent docker image ( not with zookeeper resiliency, but with kafka resiliency), we can advertise the hostname of the container to Kafka by using the SCHEMA_REGISTRY_HOST_NAME env variable.

If I try to use the SCHEMA_REGISTRY_PORT, I get the following error :

PORT is deprecated. Please use SCHEMA_REGISTRY_LISTENERS instead.

Why cannot we set the port associated ? I can get this dynamic port ( the dynamic port the host machine mapped dynamically to my container) but how am I supposed to share it with Kafka ?

EDIT 1 :

To add more details, here is an example of assignment made by the schema registry coordinator :

[2019-10-25 11:55:47,813] INFO Finished rebalance with master election result: Assignment{version=1, error=0, master='sr-1-7a9a403a-63cc-4fed-b548-10ea440863d5', masterIdentity=version=1,host=10.135.124.179,port=29932,scheme=http,masterEligibility=true} (io.confluent.kafka.schemaregistry.masterelector.kafka.KafkaGroupMasterElector)

As you can see there is a hostname and a port ( in bold). The hostname comes from the variable SCHEMA_REGISTRY_HOST_NAME but according to the code, the port comes from here :

/**
   * A Schema Registry instance's identity is in part the port it listens on. Currently the port can
   * either be configured via the deprecated `port` configuration, or via the `listeners`
   * configuration.
   *
   * <p>This method uses `Application.parseListeners()` from `rest-utils` to get a list of
   * listeners, and returns the port of the first listener to be used for the instance's identity.
   *
   * <p></p>In theory, any port from any listener would be sufficient. Choosing the first, instead
   * of say the last, is arbitrary.
   */
  // TODO: once RestConfig.PORT_CONFIG is deprecated, remove the port parameter.
  static SchemeAndPort getSchemeAndPortForIdentity(int port, List<String> configuredListeners,
                                                   String requestedScheme)

( https://github.com/confluentinc/schema-registry/blob/5af0ca3be1138fe483d0f90f4ccfd4f02f158334/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaSchemaRegistry.java#L211-L223 )

So the only way to advertize the port is by setting it using the listeners, which can be annoying ( but still feasible as a workaround).


Solution

  • Ok, I made a working Nomad template file, maybe this might help. The workaround is to use the same dynamic port that nomad use for port mapping, inside the container, as follows :

    job "schema-registry" {
      datacenters = ["YOURDC"]
      type = "service"
    
      # Update strategy
      update {
        # Max instances (task groups) to be updated in parallel.
        max_parallel = 1
    
        # Once an allocation finishes, wait min_healthy_time until starting next one.
        min_healthy_time = "10s"
    
        # If allocation not healthy after healthy_deadline, mark as unhealthy.
        healthy_deadline = "3m"
    
        # If allocation unhealthy after progress_deadline, fail the deployment.
        progress_deadline = "10m"
    
        # Should auto revert to previous version if the deployment fails?
        auto_revert = false
    
        # Create n canaries.
        canary = 0
      }
    
      spread {
        attribute = "${attr.unique.hostname}"
        weight    = 100
      }
    
      migrate {
        # As in update stanza.
        max_parallel = 1
    
        # "checks" for health checks or "task_state" for task state.
        health_check = "checks"
    
        # As in update stanza.
        min_healthy_time = "10s"
    
        # As in update stanza.
        healthy_deadline = "5m"
      }
    
      # The "group" stanza defines a series of tasks that should be co-located on
      # the same Nomad client.
      group "schema-registry-group" {
    
        # count of instances of the "schema-registry" task group
        count = 3
    
        restart {
          # The number of attempts to run the job within the specified interval.
          attempts = 2
          interval = "30m"
    
          # The "delay" parameter specifies the duration to wait before restarting
          # a task after it has failed.
          delay = "5s"
    
          # What if after a few restats within `interval` the `attempts` limit is meet?
          # - "delay" mode delays the next restart until the next interval,
          # - "fail" mode does not restart the task.
          mode = "fail"
        }
    
        # Use ephemeral disk shared between tasks instead of HDD
        ephemeral_disk {
          size = 300
        }
    
        task "schema-registry" {
          driver = "docker"
    
          # Driver (docker) settings.
          config {
            image = "confluentinc/cp-schema-registry"
    
            # We cannot use this as schema registry will use the LISTENERS config to advertize its port..
            #port_map {
            #  schema_registry_port = 8081
            #}
          }
    
          # Time to shut down after SIGINT.
          # Caution! If want to set higher than 30s,
          # make sure max_kill_timeout allows that.
          kill_timeout = "30s"
          kill_signal = "SIGINT"
          shutdown_delay = "2s"
    
          env {
            SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS="YOURBOOTSTRAPSERVER"
            SCHEMA_REGISTRY_HOST_NAME="${NOMAD_IP_schema_registry_port}"
            SCHEMA_REGISTRY_LISTENERS="http://0.0.0.0:${NOMAD_HOST_PORT_schema_registry_port}"
          }
    
          # Max required storage = (max_files * 2) * max_file_size
          # *2 because there's a log file for stderr and stdout
          logs {
             max_files     = 10
             max_file_size = 15
           }
    
          # Required resources.
          resources {
            cpu    = 500 # 500 MHz
            memory = 512
            network {
              port "schema_registry_port" {} # defined in port_map
            }
          }
    
          service {
            name = "schema-registry"
            tags = ["schema-registry"]
            port = "schema_registry_port"
            check {
              name     = "alive"
              type     = "tcp"
              interval = "60s"
              timeout  = "4s"
            }
          }
        }
      }
    }