ocfs2/dlm: Add message DLM_QUERY_NODEINFO

Adds new dlm message DLM_QUERY_NODEINFO that sends the attributes of all
registered nodes. This message is sent if the negotiated dlm protocol is
1.1 or higher. If the information of the joining node does not match
that of any existing nodes, the join domain request is rejected.

Signed-off-by: Sunil Mushran <sunil.mushran@oracle.com>
diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c
index 4965075..78d428f 100644
--- a/fs/ocfs2/dlm/dlmdomain.c
+++ b/fs/ocfs2/dlm/dlmdomain.c
@@ -131,6 +131,7 @@
  *
  * New in version 1.1:
  *	- Message DLM_QUERY_REGION added to support global heartbeat
+ *	- Message DLM_QUERY_NODEINFO added to allow online node removes
  */
 static const struct dlm_protocol_version dlm_protocol = {
 	.pv_major = 1,
@@ -1123,6 +1124,173 @@
 	return status;
 }
 
+static int dlm_match_nodes(struct dlm_ctxt *dlm, struct dlm_query_nodeinfo *qn)
+{
+	struct o2nm_node *local;
+	struct dlm_node_info *remote;
+	int i, j;
+	int status = 0;
+
+	for (j = 0; j < qn->qn_numnodes; ++j)
+		mlog(0, "Node %3d, %pI4:%u\n", qn->qn_nodes[j].ni_nodenum,
+		     &(qn->qn_nodes[j].ni_ipv4_address),
+		     ntohs(qn->qn_nodes[j].ni_ipv4_port));
+
+	for (i = 0; i < O2NM_MAX_NODES && !status; ++i) {
+		local = o2nm_get_node_by_num(i);
+		remote = NULL;
+		for (j = 0; j < qn->qn_numnodes; ++j) {
+			if (qn->qn_nodes[j].ni_nodenum == i) {
+				remote = &(qn->qn_nodes[j]);
+				break;
+			}
+		}
+
+		if (!local && !remote)
+			continue;
+
+		if ((local && !remote) || (!local && remote))
+			status = -EINVAL;
+
+		if (!status &&
+		    ((remote->ni_nodenum != local->nd_num) ||
+		     (remote->ni_ipv4_port != local->nd_ipv4_port) ||
+		     (remote->ni_ipv4_address != local->nd_ipv4_address)))
+			status = -EINVAL;
+
+		if (status) {
+			if (remote && !local)
+				mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
+				     "registered in joining node %d but not in "
+				     "local node %d\n", qn->qn_domain,
+				     remote->ni_nodenum,
+				     &(remote->ni_ipv4_address),
+				     ntohs(remote->ni_ipv4_port),
+				     qn->qn_nodenum, dlm->node_num);
+			if (local && !remote)
+				mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
+				     "registered in local node %d but not in "
+				     "joining node %d\n", qn->qn_domain,
+				     local->nd_num, &(local->nd_ipv4_address),
+				     ntohs(local->nd_ipv4_port),
+				     dlm->node_num, qn->qn_nodenum);
+			BUG_ON((!local && !remote));
+		}
+
+		if (local)
+			o2nm_node_put(local);
+	}
+
+	return status;
+}
+
+static int dlm_send_nodeinfo(struct dlm_ctxt *dlm, unsigned long *node_map)
+{
+	struct dlm_query_nodeinfo *qn = NULL;
+	struct o2nm_node *node;
+	int ret = 0, status, count, i;
+
+	if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
+		goto bail;
+
+	qn = kzalloc(sizeof(struct dlm_query_nodeinfo), GFP_KERNEL);
+	if (!qn) {
+		ret = -ENOMEM;
+		mlog_errno(ret);
+		goto bail;
+	}
+
+	for (i = 0, count = 0; i < O2NM_MAX_NODES; ++i) {
+		node = o2nm_get_node_by_num(i);
+		if (!node)
+			continue;
+		qn->qn_nodes[count].ni_nodenum = node->nd_num;
+		qn->qn_nodes[count].ni_ipv4_port = node->nd_ipv4_port;
+		qn->qn_nodes[count].ni_ipv4_address = node->nd_ipv4_address;
+		mlog(0, "Node %3d, %pI4:%u\n", node->nd_num,
+		     &(node->nd_ipv4_address), ntohs(node->nd_ipv4_port));
+		++count;
+		o2nm_node_put(node);
+	}
+
+	qn->qn_nodenum = dlm->node_num;
+	qn->qn_numnodes = count;
+	qn->qn_namelen = strlen(dlm->name);
+	memcpy(qn->qn_domain, dlm->name, qn->qn_namelen);
+
+	i = -1;
+	while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
+				  i + 1)) < O2NM_MAX_NODES) {
+		if (i == dlm->node_num)
+			continue;
+
+		mlog(0, "Sending nodeinfo to node %d\n", i);
+
+		ret = o2net_send_message(DLM_QUERY_NODEINFO, DLM_MOD_KEY,
+					 qn, sizeof(struct dlm_query_nodeinfo),
+					 i, &status);
+		if (ret >= 0)
+			ret = status;
+		if (ret) {
+			mlog(ML_ERROR, "node mismatch %d, node %d\n", ret, i);
+			break;
+		}
+	}
+
+bail:
+	kfree(qn);
+	return ret;
+}
+
+static int dlm_query_nodeinfo_handler(struct o2net_msg *msg, u32 len,
+				      void *data, void **ret_data)
+{
+	struct dlm_query_nodeinfo *qn;
+	struct dlm_ctxt *dlm = NULL;
+	int locked = 0, status = -EINVAL;
+
+	qn = (struct dlm_query_nodeinfo *) msg->buf;
+
+	mlog(0, "Node %u queries nodes on domain %s\n", qn->qn_nodenum,
+	     qn->qn_domain);
+
+	spin_lock(&dlm_domain_lock);
+	dlm = __dlm_lookup_domain_full(qn->qn_domain, qn->qn_namelen);
+	if (!dlm) {
+		mlog(ML_ERROR, "Node %d queried nodes on domain %s before "
+		     "join domain\n", qn->qn_nodenum, qn->qn_domain);
+		goto bail;
+	}
+
+	spin_lock(&dlm->spinlock);
+	locked = 1;
+	if (dlm->joining_node != qn->qn_nodenum) {
+		mlog(ML_ERROR, "Node %d queried nodes on domain %s but "
+		     "joining node is %d\n", qn->qn_nodenum, qn->qn_domain,
+		     dlm->joining_node);
+		goto bail;
+	}
+
+	/* Support for node query was added in 1.1 */
+	if (dlm->dlm_locking_proto.pv_major == 1 &&
+	    dlm->dlm_locking_proto.pv_minor == 0) {
+		mlog(ML_ERROR, "Node %d queried nodes on domain %s "
+		     "but active dlm protocol is %d.%d\n", qn->qn_nodenum,
+		     qn->qn_domain, dlm->dlm_locking_proto.pv_major,
+		     dlm->dlm_locking_proto.pv_minor);
+		goto bail;
+	}
+
+	status = dlm_match_nodes(dlm, qn);
+
+bail:
+	if (locked)
+		spin_unlock(&dlm->spinlock);
+	spin_unlock(&dlm_domain_lock);
+
+	return status;
+}
+
 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
 				   void **ret_data)
 {
@@ -1443,8 +1611,13 @@
 	set_bit(dlm->node_num, dlm->domain_map);
 	spin_unlock(&dlm->spinlock);
 
-	/* Support for global heartbeat was added in 1.1 */
+	/* Support for global heartbeat and node info was added in 1.1 */
 	if (dlm_protocol.pv_major > 1 || dlm_protocol.pv_minor > 0) {
+		status = dlm_send_nodeinfo(dlm, ctxt->yes_resp_map);
+		if (status) {
+			mlog_errno(status);
+			goto bail;
+		}
 		status = dlm_send_regions(dlm, ctxt->yes_resp_map);
 		if (status) {
 			mlog_errno(status);
@@ -2026,6 +2199,13 @@
 					dlm_query_region_handler,
 					NULL, NULL, &dlm_join_handlers);
 
+	if (status)
+		goto bail;
+
+	status = o2net_register_handler(DLM_QUERY_NODEINFO, DLM_MOD_KEY,
+					sizeof(struct dlm_query_nodeinfo),
+					dlm_query_nodeinfo_handler,
+					NULL, NULL, &dlm_join_handlers);
 bail:
 	if (status < 0)
 		dlm_unregister_net_handlers();