Network architecture

In this chapter we're going to provide a bird's eye view of a family of possible network architectures for Cell that are specifically designed to implement client/server systems. We'll start with a very simple and limited one, and gradually make it more complex and capable. None of this has been implemented yet, and in fact work on this part of the language will only start after version 0.6 has been released, so this is just a design document at present.

The goal of this family of architectures is to automate much of the drudgery involved in implementing distributed software. With conventional languages and tools, creating client-server applications is significantly more challenging than implementing local ones with equivalent functionalities. It involves a lot of extra work to write code that is at the same time repetitive and error prone. But much of that extra effort is unnecessary, as it involves functionality (like for example transferring and synchronizing information between clients and servers, persistence, error handling and retry logic) that can be automated with a properly designed network architecture.

Here the developer will be expected to implement the same functionality that would be required for a local, non-distributed application, plus another piece of code whose purpose is to synchronize concurrent changes to data that is replicated at different network nodes (we'll examine the details later, but for now think of it as a programmatic version of the rebase operation in a source control system like git), and everything else will be the responsibility of the compiler, which will generate the entire server code, and automatically implement all the communication between server and client. Code for client and server can be generated in different languages if need be, and it will be actually possible to generate code for the client in multiple languages, so as to target multiple client platforms.

The basic idea

Let's start with the simplest possible type of network application, one where the various users of the system do not interact with one another. An example could be the website of an online retailer, which allows users to browse the product catalog and to place orders and check their status, or single player online games that allow the same game to be played from multiple devices which must therefore be kept synchronized.

One thing all these cases have in common is that there's a bunch of per-user data that needs to be persisted and shared between the server and one or more clients. In the case of the online retailer, for example, that may include a user's personal information, their shipping addresses, the contents of their shopping cart and the details of all their orders. For a single player online game on the other hand this per-user state will mostly consists of the persistent part of the game state. Applications like these may end up dealing with huge amounts of data if the system has a large number of users, but the amount of data that is specific to each user is often reasonably small.

In Cell you would start by creating a relational automaton that contains the data that is specific to each particular user, with all the logic/code that is associated with it. Its instances will be mostly independent of one another, in the sense that most of the code we'll be writing will operate on a single instance at a time. Moreover, partitioning the data in per-user chunks of relatively small size will make it feasible to replicate each individual chunk across several network nodes. This is what such a system would look like in its simplest form, if we restrict our attention to a single user:

Here we have the server and two clients, A and B, that are being used by the same user to access the online application. The small boxes labeled S are the replicas of the relational automaton instance that contains the "private" data of the user, and the master copy is the one on the server. The server is charged with persisting the state of each automaton instance. In order to do so, it will rely on an external low-level key/value store, that is not shown in the picture. The clients will also locally persist their own copy of the data, but even if the latter is deleted or lost they will always be able to re-download it from the server. The server will manage the data of all users, and it can consist of a single machine or a cloud.

The first time a client connects to the server it will download the serialized state of the automaton instance it's interested in. If that instance does not exist yet the server will automatically create it in its default state. Once the data has been downloaded a local replica of S can finally be instantiated.

When the system is at rest, all replicas of S will be in exactly the same state, and the system will be idle, just waiting for the next message to come in. A message can be sent at any time to any of the replicas of S. If the subsequent update is successful, it will be automatically and transparently propagated across the whole system by transmitting a copy the message (which is just a value, that is, a piece of data) to each of the network nodes, where it will be processed by the local replica. If the update originates at one of the client, it will be transmitted to the server first, and from there to the other clients. Since updates in Cell are deterministic this is enough to keep the replicas synchronized, as long as there are no concurrency issue (we'll see how to deal with concurrency later).

The server will be responsible for saving the state of the automaton instances when they're not being used, and for reloading them from persistent storage when they are needed again. The whole process will be transparent to the clients, and from the outside it will look like the instances are always in memory. The server will also keep a history of the messages received by each instance, which will remain always available to the clients that need to re-synchronize, so that if a client goes offline for some time it will always be able to bring the local replica up to date by downloading and processing locally the last messages, without having to re-download the entire state again.

Generalizing the architecture

In the previous paragraph we assumed that there's a one-to-one correspondence between automaton instances and user, with each instance being "private" to that particular user. There's of course no particular reason for those limitations.

A user's data could be split into several different automata: in the case the online retailer, for example, we could have an automaton for things like name, shipping addresses and content of the shopping cart, one for open or recent orders, and another one that contains the entire order history. Splitting a user's data into smaller chunks would only make it easier to replicate the corresponding automata across the network.

There's also no reason why an automaton instance cannot be accessed by multiple users simultaneously. Even in the online retailer example a user's data would have to be accessed inside the company by the shipping team, or the customer service team, possibly at the same time as the user. In a multiplayer game, a collaborative document editor or a chat app that will obviously be the norm rather then the exception.

We'll also have to account for the fact that there might be automata that cannot be replicated and have to stay on the server, either because they contain a large amount of data that cannot be easily partitioned, or because we want to restrict access to the data they contain because of security reasons.

We'll discuss all that (and more) in detail later. But first, let's talk about concurrency.

Concurrency

Everything we've seen so far can be provided automatically and transparently by the compiler: there's no need for the developer to implement the data exchange between client and server, the error handling logic or to manually take data out of a database and put it back in. Up to this point, the implementation of a client/server application does not require any extra work on the part of the developer compared to a local one.

There's one issue though that cannot be hidden by a network architecture like the one we've been discussing and requires the intervention of the developer: concurrency. The problem arises when two clients receive a message at more or less the same time, that is, the second of the two messages is received before the update triggered by the first one has time to propagate through the system. Here's an example:

Here the two local replicas of S at clients A and B are initially synchronized (that is, they are in the same initial state S0) but they are sent different messages (Ma and Mb respectively) at more or less the same time, and both updates are successful. At this stage though neither of the two updates is committed yet, because the master replica of S is the one on the server: no update is final until it has been processed by the server.

So the next step for both clients is to transmit their message to the server, so as to complete the (distributed) transaction. Let's say client A gets there first and Ma is appended (and committed) to the server's message history. When client B tries to synchronize its own message history (which contains Mb) with the server's one, the operation will fail because the two histories have now diverged.

This is exactly the same thing that happens with a source control system like git when an attempt to push some local commits to the server fails because since the last pull someone else pushed their own commits, and now the local and remote branches have diverged.

So the first thing for client B to do now is to roll back the update triggered by Mb (which can be done efficiently in Cell), download Ma from the server and send it to its local replica. Once that's done all the replicas will be again in the same state (let's call this new state S1).

What should now client B do with Mb? Just discarding it is unacceptable, for obvious reasons. But trying to re-send it wouldn't work either, at least not in general: if the updates triggered by Ma and Mb touch the same data that could lead to (among other things) race conditions and data losses.

Since the system cannot provide a way to reconcile concurrent updates that is both general and sound, the only acceptable thing to do here is to have the developer specify explicitly how the conflict ought to be resolved.

To that purpose, the developer will have to implement a function (conceptually, at least: the exact mechanism will be a bit different) that takes as input S0, Ma and Mb and returns a list of messages (let's call it Ms) that are meant to replace Mb. At this point Mb will be discarded and the messages in Ms will be sent to S (which is now in state S1).

This way the developer will be able to produce any desired behavior. If for example they wanted to just discard Mb, all they would have to do is to return an empty list, and if they instead wanted to just re-send it, which is an acceptable thing to do when Ma and Mb don't touch the same data, they would return a list whose only element is Mb.

The trickiest case to handle is when Ma and Mb try to update the same pieces of data, or, more generally, when they somehow interfere with each other. This is the case where the developer will have to implement some application-specific logic for combining the two updates. We'll see an example of that in the next paragraph.

Building an online spreadsheet

In order to illustrate how the proposed mechanism will work, we'll now walk through a simple example. Let's say we want to create an online spreadsheet, which will allow different users to simultaneously work on the same data. We'll start by creating a local, single-user version of (a small piece of) the application, and then turn it into a multi-user, distributed one that allows different users to concurrently edit the same sheets, and also to work offline. In super-simplified form, our data structures could like this:

// Data cells may contain numbers, dates or strings
// Calculated cells will contain a formula
type Content = Int, Float, Date, String, Formula;

// Every cell is identified by row and column
type CellId = (Nat, Nat);

// An individual sheet
schema Sheet {
  // The content of cell #1 is #2
  cell(CellId, Content) [key: 0];
}

// Updating the content of a cell
Sheet.set_cell_content(row: Nat, col: Nat, content: Content) {
  id = (this.row, this.column);
  update cell(id, this.content);
}

The Sheet automaton represents an individual sheet. Our system will contain any number of its instances, each of them identified by a key, which can be any arbitrary Cell value. Each entry in Sheet's only relation variable cell(..), stores the row, column and content of an individual Cell.

How can we turn this local application into a distributed one? First of all, we need to decide what should happen when two different users try to edit the same cell of the same spreadsheet at the same time. Here there are at least two main use cases we need to deal with.

The simplest one is when the users are collaboratively editing the sheet in real time. Imagine for example that (using the terminology of the previous paragraph) users A and B try to edit the same cell (let's say it's (5, 2)), whose initial content is the string "abc". Furthermore, let's say user A types in a 'd', while user B types in an 'e'. That would result in user A's replica of S being sent the message Ma = set_cell_content(row: 5, col: 2, content: "abcd") and B's replica Mb = set_cell_content(row: 5, col: 2, content: "abce"). As before we'll assume that A manages to commit the update, and that B has to resolve the editing conflict. In a case like this, it's acceptable to just discard Mb: user B might loose the last few seconds of work, but that's not a big deal. So our rebase function would look more or less like this (note that this is just pseudocode: the actual syntax hasn't been finalized yet, but it will certainly be different):

// Rebasing msg_b on top of msg_a
Sheet rebase msg_a msg_b {
  // If msg_a and msg_b affect the same cell,
  // we just discard the latter.
  if msg_a.row == msg_b.row and msg_a.col == msg_b.col
    return ();

  // If there's no conflict, we just send msg_b unchanged
  return (msg_b);
}

In order to make the application more user-friendly, we could decide to highlight the cells that have been recently changed by other users. To that purpose, we could store for every cell the list of clients who updated it, and the time of their last update. The UI would then use that information to somehow show the activity of the other concurrent users:

schema Sheet {
  // The content of cell #1 is #2
  cell(CellId, Content) [key: 0];

  // The time of the last edit made by client #2 to cell #1 was #3
  last_edited(CellId, ClientId, Time) [key: 0:1];

  // The time of the last discarded edit made by client #2 to cell #1 was #3
  last_discarded(CellId, ClientId, Time) [key: 0:1];
}

// Updating the content of a cell
Sheet.set_cell_content(row: Nat, col: Nat, content: Content, client: ClientId, time: Time) {
  id = (this.row, this.col);
  update cell(id, this.content), last_edited(id, this.client, this.time);
}

// Recording the fact that an edit was discarded
Sheet.edit_was_discarded(row: Nat, col: Nat, client: ClientId, time: Time) {
  id = (this.row, this.col);
  update last_discarded(id, this.client, this.time);
}

In the above code we're also keeping track of which cells lost data, so that the UI can somehow highlight those cells as well. Our rebase function would now look like this:

Sheet rebase msg_a msg_b {
  if msg_a.row == msg_b.row and msg_a.col == msg_b.col {
    // Keeping track of fact that we discarded an edit
    msg = edit_was_discarded(
      row:    msg_b.row,
      col:    msg_b.col,
      client: msg_b.client,
      time:   msg_b.time
    )
    return (msg);
  }

  return (msg_b);
}

Working offline

Dealing with concurrency becomes significantly more complex if we decide to support offline work. In this case, we cannot afford to just discard Mb whenever a conflict arises: if a user spends a significant amount of time working offline, they wouldn't be too happy to loose all their work just because another user made a change to the same sheet in the meantime.

One possible approach here is to discard Mb only if it's a very recent change, that is, one made in the last few seconds. If it's not, we could just keep both versions of the content, and let users reconcile them as they see fit. In order to do so, we would have to add a new case to Content:

type BasicContent       = Int, Float, Date, String, Formula;
type UnresolvedContent  = unresolved_content([+ClientId -> BasicContent]);
type Content            = BasicContent, UnresolvedContent;

A cell can now be flagged as "unresolved": that means that it was edited concurrently by different clients, and one of them is now expected to manually reconcile those conflicting changes. The UI would have to highlight any unresolved cell, and provide a mechanism for the user to visualize the "diverged" values and to somehow merge them. The rebase function would now look like this:

Sheet rebase msg_a msg_b {
  if msg_a.row == msg_b.row and msg_a.col == msg_b.col {
    // Retrieving the current time
    now = Now();

    // If the change is very recent we can afford to discard it
    if is_very_recent(msg_b.time, now) {
      msg = edit_was_discarded(
        row:    msg_b.row,
        col:    msg_b.col,
        client: msg_b.client,
        time:   msg_b.time
      )
      return (msg);
    }

    // Otherwise we flag the cell as unresolved,
    // and we keep both versions of the content
    content = merge(
      msg_a.content,
      msg_a.client,
      msg_b.content,
      msg_b.client
    );

    msg = set_cell_content(
      row:      msg_b.row,
      col:      msg_b.col,
      content:  content,
      client:   msg_b.client,
      time:     msg_b.time
    );

    return (msg);
  }

  return (msg_b);
}

This is how the merge(..) function could be implemented:

Content merge(BasicContent c1, ClientId id1, BasicContent c2, ClientId id2) =
  unresolved_content([
    id1 -> c1 if id1 != id2,
    id2 -> c2
  ]);

Content merge(UnresolvedContent c1, ClientId id1, BasicContent c2, ClientId id2) =
  unresolved_content(
    [id -> c : id c <- untag(c1), id != id2] & [id2 -> c2]
  );

// No need to handle the case where msg_b.content is of type
// UnresolvedContent, as this is not supposed to happen: any
// local edit would lead to the content being of type BasicContent
// Only the rebase function can produce a value of type UnresolvedContent
Content merge(Content c1, ClientId id1, UnresolvedContent c2, ClientId id2) =
  undefined;

In this particular case, a proper rebasing was necessary only when working offline, but in general it will be required for online activity as well. An example of that is a chat app: users will be sending messages to the same chat group or forum at the same time, and the rebase function will have to ensure that no message is ever lost. A more complex example is a multiplayer game, which might require a sophisticated way of combining the concurrent actions of different players.

To be continued...