Search code examples
scalaakkacluster-computingdistributed-computingakka-cluster

How can I prevent/restrict an unauthorized node from joining an an Akka cluster?


In the Akka Cluster examples, documentation and tutorials, and it seems that any node can join a cluster just by knowing the cluster and the cluster's seed nodes. How can I prevent unauthorized nodes from joining?


Solution

  • I was trying to achieve this functionality and I also did not find this feature in Akka cluster. I managed to do by creating an authorization flag on the actor, wrap the cluster in an Option[Cluster], and decide to subscribe or not subscribe the actor on the cluster in the preStart() mwethod. This already prevents the actor to receive messages. However, it still can send messages to the cluster members. Then I put the flag on the case UserMessage(content) if (authorized) => ???. I did not kik the actor out of the cluster, but I believe it is just kik he after he is up.

    object Alice extends ChatApp("Alice", 2551, true)
    object Bob extends ChatApp("Bob", 2552, true)
    object Fabio extends ChatApp("Fabio", 2554, false)
    

    the actor

    class ChatActor(nickname: String, port: Int, authorized: Boolean) extends Actor with ActorLogging {
      // 1: initialize the cluster object
      val cluster: Option[Cluster] = if (authorized) Some(Cluster(context.system)) else None
    
      // 2: subscribe to cluster event in preStart
      override def preStart(): Unit = {
        if (authorized) {
          cluster match {
            case Some(c) => c.subscribe(
              self,
              initialStateMode = InitialStateAsEvents,
              classOf[MemberEvent]
            )
            case None => log.info(s"user [$nickname] is not authorized to enter in the cluster =(. Please leave the cluster.")
          }
        }
      }
    
      // 3: unsubscribe self in postStop
      override def postStop(): Unit = {
        cluster match {
          case Some(c) => c.unsubscribe(self)
          case None => log.info(s"user [$nickname] is not authorized to enter in the cluster =(.")
        }
      }
    
      override def receive: Receive = online(Map())
    
      /** chatRoom is the data structure to the users in the chat */
      def online(chatRoom: Map[String, String]): Receive = {
        case UserMessage(content) if (authorized) =>
          chatRoom.keys.foreach { address =>
            val chatActorSelection: ActorSelection = context.actorSelection(s"${address}/user/chatActor")
            chatActorSelection ! ChatMessage(nickname, content)
          }
      }
    }