ocfs2: Negotiate locking protocol versions.

Currently, when ocfs2 nodes connect via TCP, they advertise their
compatibility level.  If the versions do not match, two nodes cannot speak
to each other and they disconnect. As a result, this provides no forward or
backwards compatibility.

This patch implements a simple protocol negotiation at the dlm level by
introducing a major/minor version number scheme for entities that
communicate.  Specifically, o2dlm has a major/minor version for interaction
with o2dlm on other nodes, and ocfs2 itself has a major/minor version for
interacting with the filesystem on other nodes.

This will allow rolling upgrades of ocfs2 clusters when changes to the
locking or network protocols can be done in a backwards compatible manner.
In those cases, only the minor number is changed and the negotatied protocol
minor is returned from dlm join. In the far less likely event that a
required protocol change makes backwards compatibility impossible, we simply
bump the major number.

Signed-off-by: Joel Becker <joel.becker@oracle.com>
Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
diff --git a/fs/ocfs2/dlm/dlmdomain.c b/fs/ocfs2/dlm/dlmdomain.c
index 6954565..638d2eb 100644
--- a/fs/ocfs2/dlm/dlmdomain.c
+++ b/fs/ocfs2/dlm/dlmdomain.c
@@ -123,6 +123,17 @@
 LIST_HEAD(dlm_domains);
 static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
 
+/*
+ * The supported protocol version for DLM communication.  Running domains
+ * will have a negotiated version with the same major number and a minor
+ * number equal or smaller.  The dlm_ctxt->dlm_locking_proto field should
+ * be used to determine what a running domain is actually using.
+ */
+static const struct dlm_protocol_version dlm_protocol = {
+	.pv_major = 1,
+	.pv_minor = 0,
+};
+
 #define DLM_DOMAIN_BACKOFF_MS 200
 
 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
@@ -133,6 +144,8 @@
 				   void **ret_data);
 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
 				   void **ret_data);
+static int dlm_protocol_compare(struct dlm_protocol_version *existing,
+				struct dlm_protocol_version *request);
 
 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
 
@@ -668,11 +681,45 @@
 }
 EXPORT_SYMBOL_GPL(dlm_unregister_domain);
 
+static int dlm_query_join_proto_check(char *proto_type, int node,
+				      struct dlm_protocol_version *ours,
+				      struct dlm_protocol_version *request)
+{
+	int rc;
+	struct dlm_protocol_version proto = *request;
+
+	if (!dlm_protocol_compare(ours, &proto)) {
+		mlog(0,
+		     "node %u wanted to join with %s locking protocol "
+		     "%u.%u, we respond with %u.%u\n",
+		     node, proto_type,
+		     request->pv_major,
+		     request->pv_minor,
+		     proto.pv_major, proto.pv_minor);
+		request->pv_minor = proto.pv_minor;
+		rc = 0;
+	} else {
+		mlog(ML_NOTICE,
+		     "Node %u wanted to join with %s locking "
+		     "protocol %u.%u, but we have %u.%u, disallowing\n",
+		     node, proto_type,
+		     request->pv_major,
+		     request->pv_minor,
+		     ours->pv_major,
+		     ours->pv_minor);
+		rc = 1;
+	}
+
+	return rc;
+}
+
 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
 				  void **ret_data)
 {
 	struct dlm_query_join_request *query;
-	enum dlm_query_join_response response;
+	union dlm_query_join_response response = {
+		.packet.code = JOIN_DISALLOW,
+	};
 	struct dlm_ctxt *dlm = NULL;
 	u8 nodenum;
 
@@ -690,11 +737,11 @@
 		mlog(0, "node %u is not in our live map yet\n",
 		     query->node_idx);
 
-		response = JOIN_DISALLOW;
+		response.packet.code = JOIN_DISALLOW;
 		goto respond;
 	}
 
-	response = JOIN_OK_NO_MAP;
+	response.packet.code = JOIN_OK_NO_MAP;
 
 	spin_lock(&dlm_domain_lock);
 	dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
@@ -713,7 +760,7 @@
 				mlog(0, "disallow join as node %u does not "
 				     "have node %u in its nodemap\n",
 				     query->node_idx, nodenum);
-				response = JOIN_DISALLOW;
+				response.packet.code = JOIN_DISALLOW;
 				goto unlock_respond;
 			}
 		}
@@ -733,30 +780,48 @@
 			/*If this is a brand new context and we
 			 * haven't started our join process yet, then
 			 * the other node won the race. */
-			response = JOIN_OK_NO_MAP;
+			response.packet.code = JOIN_OK_NO_MAP;
 		} else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
 			/* Disallow parallel joins. */
-			response = JOIN_DISALLOW;
+			response.packet.code = JOIN_DISALLOW;
 		} else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
 			mlog(0, "node %u trying to join, but recovery "
 			     "is ongoing.\n", bit);
-			response = JOIN_DISALLOW;
+			response.packet.code = JOIN_DISALLOW;
 		} else if (test_bit(bit, dlm->recovery_map)) {
 			mlog(0, "node %u trying to join, but it "
 			     "still needs recovery.\n", bit);
-			response = JOIN_DISALLOW;
+			response.packet.code = JOIN_DISALLOW;
 		} else if (test_bit(bit, dlm->domain_map)) {
 			mlog(0, "node %u trying to join, but it "
 			     "is still in the domain! needs recovery?\n",
 			     bit);
-			response = JOIN_DISALLOW;
+			response.packet.code = JOIN_DISALLOW;
 		} else {
 			/* Alright we're fully a part of this domain
 			 * so we keep some state as to who's joining
 			 * and indicate to him that needs to be fixed
 			 * up. */
-			response = JOIN_OK;
-			__dlm_set_joining_node(dlm, query->node_idx);
+
+			/* Make sure we speak compatible locking protocols.  */
+			if (dlm_query_join_proto_check("DLM", bit,
+						       &dlm->dlm_locking_proto,
+						       &query->dlm_proto)) {
+				response.packet.code =
+					JOIN_PROTOCOL_MISMATCH;
+			} else if (dlm_query_join_proto_check("fs", bit,
+							      &dlm->fs_locking_proto,
+							      &query->fs_proto)) {
+				response.packet.code =
+					JOIN_PROTOCOL_MISMATCH;
+			} else {
+				response.packet.dlm_minor =
+					query->dlm_proto.pv_minor;
+				response.packet.fs_minor =
+					query->fs_proto.pv_minor;
+				response.packet.code = JOIN_OK;
+				__dlm_set_joining_node(dlm, query->node_idx);
+			}
 		}
 
 		spin_unlock(&dlm->spinlock);
@@ -765,9 +830,9 @@
 	spin_unlock(&dlm_domain_lock);
 
 respond:
-	mlog(0, "We respond with %u\n", response);
+	mlog(0, "We respond with %u\n", response.packet.code);
 
-	return response;
+	return response.intval;
 }
 
 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
@@ -899,10 +964,11 @@
 
 static int dlm_request_join(struct dlm_ctxt *dlm,
 			    int node,
-			    enum dlm_query_join_response *response)
+			    enum dlm_query_join_response_code *response)
 {
-	int status, retval;
+	int status;
 	struct dlm_query_join_request join_msg;
+	union dlm_query_join_response join_resp;
 
 	mlog(0, "querying node %d\n", node);
 
@@ -910,12 +976,15 @@
 	join_msg.node_idx = dlm->node_num;
 	join_msg.name_len = strlen(dlm->name);
 	memcpy(join_msg.domain, dlm->name, join_msg.name_len);
+	join_msg.dlm_proto = dlm->dlm_locking_proto;
+	join_msg.fs_proto = dlm->fs_locking_proto;
 
 	/* copy live node map to join message */
 	byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES);
 
 	status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
-				    sizeof(join_msg), node, &retval);
+				    sizeof(join_msg), node,
+				    &join_resp.intval);
 	if (status < 0 && status != -ENOPROTOOPT) {
 		mlog_errno(status);
 		goto bail;
@@ -928,14 +997,41 @@
 	if (status == -ENOPROTOOPT) {
 		status = 0;
 		*response = JOIN_OK_NO_MAP;
-	} else if (retval == JOIN_DISALLOW ||
-		   retval == JOIN_OK ||
-		   retval == JOIN_OK_NO_MAP) {
-		*response = retval;
+	} else if (join_resp.packet.code == JOIN_DISALLOW ||
+		   join_resp.packet.code == JOIN_OK_NO_MAP) {
+		*response = join_resp.packet.code;
+	} else if (join_resp.packet.code == JOIN_PROTOCOL_MISMATCH) {
+		mlog(ML_NOTICE,
+		     "This node requested DLM locking protocol %u.%u and "
+		     "filesystem locking protocol %u.%u.  At least one of "
+		     "the protocol versions on node %d is not compatible, "
+		     "disconnecting\n",
+		     dlm->dlm_locking_proto.pv_major,
+		     dlm->dlm_locking_proto.pv_minor,
+		     dlm->fs_locking_proto.pv_major,
+		     dlm->fs_locking_proto.pv_minor,
+		     node);
+		status = -EPROTO;
+		*response = join_resp.packet.code;
+	} else if (join_resp.packet.code == JOIN_OK) {
+		*response = join_resp.packet.code;
+		/* Use the same locking protocol as the remote node */
+		dlm->dlm_locking_proto.pv_minor =
+			join_resp.packet.dlm_minor;
+		dlm->fs_locking_proto.pv_minor =
+			join_resp.packet.fs_minor;
+		mlog(0,
+		     "Node %d responds JOIN_OK with DLM locking protocol "
+		     "%u.%u and fs locking protocol %u.%u\n",
+		     node,
+		     dlm->dlm_locking_proto.pv_major,
+		     dlm->dlm_locking_proto.pv_minor,
+		     dlm->fs_locking_proto.pv_major,
+		     dlm->fs_locking_proto.pv_minor);
 	} else {
 		status = -EINVAL;
-		mlog(ML_ERROR, "invalid response %d from node %u\n", retval,
-		     node);
+		mlog(ML_ERROR, "invalid response %d from node %u\n",
+		     join_resp.packet.code, node);
 	}
 
 	mlog(0, "status %d, node %d response is %d\n", status, node,
@@ -1008,7 +1104,7 @@
 
 static int dlm_should_restart_join(struct dlm_ctxt *dlm,
 				   struct domain_join_ctxt *ctxt,
-				   enum dlm_query_join_response response)
+				   enum dlm_query_join_response_code response)
 {
 	int ret;
 
@@ -1034,7 +1130,7 @@
 {
 	int status = 0, tmpstat, node;
 	struct domain_join_ctxt *ctxt;
-	enum dlm_query_join_response response = JOIN_DISALLOW;
+	enum dlm_query_join_response_code response = JOIN_DISALLOW;
 
 	mlog_entry("%p", dlm);
 
@@ -1450,10 +1546,38 @@
 }
 
 /*
- * dlm_register_domain: one-time setup per "domain"
+ * Compare a requested locking protocol version against the current one.
+ *
+ * If the major numbers are different, they are incompatible.
+ * If the current minor is greater than the request, they are incompatible.
+ * If the current minor is less than or equal to the request, they are
+ * compatible, and the requester should run at the current minor version.
+ */
+static int dlm_protocol_compare(struct dlm_protocol_version *existing,
+				struct dlm_protocol_version *request)
+{
+	if (existing->pv_major != request->pv_major)
+		return 1;
+
+	if (existing->pv_minor > request->pv_minor)
+		return 1;
+
+	if (existing->pv_minor < request->pv_minor)
+		request->pv_minor = existing->pv_minor;
+
+	return 0;
+}
+
+/*
+ * dlm_register_domain: one-time setup per "domain".
+ *
+ * The filesystem passes in the requested locking version via proto.
+ * If registration was successful, proto will contain the negotiated
+ * locking protocol.
  */
 struct dlm_ctxt * dlm_register_domain(const char *domain,
-			       u32 key)
+			       u32 key,
+			       struct dlm_protocol_version *fs_proto)
 {
 	int ret;
 	struct dlm_ctxt *dlm = NULL;
@@ -1496,6 +1620,15 @@
 			goto retry;
 		}
 
+		if (dlm_protocol_compare(&dlm->fs_locking_proto, fs_proto)) {
+			mlog(ML_ERROR,
+			     "Requested locking protocol version is not "
+			     "compatible with already registered domain "
+			     "\"%s\"\n", domain);
+			ret = -EPROTO;
+			goto leave;
+		}
+
 		__dlm_get(dlm);
 		dlm->num_joins++;
 
@@ -1526,6 +1659,13 @@
 	list_add_tail(&dlm->list, &dlm_domains);
 	spin_unlock(&dlm_domain_lock);
 
+	/*
+	 * Pass the locking protocol version into the join.  If the join
+	 * succeeds, it will have the negotiated protocol set.
+	 */
+	dlm->dlm_locking_proto = dlm_protocol;
+	dlm->fs_locking_proto = *fs_proto;
+
 	ret = dlm_join_domain(dlm);
 	if (ret) {
 		mlog_errno(ret);
@@ -1533,6 +1673,9 @@
 		goto leave;
 	}
 
+	/* Tell the caller what locking protocol we negotiated */
+	*fs_proto = dlm->fs_locking_proto;
+
 	ret = 0;
 leave:
 	if (new_ctxt)