Multi-Process Development Model Enhancement

In the previous Multi-Process Model chapter, we covered the multi-process model of the framework in detail, whose Agent process suits for a common class of scenarios: some middleware clients need to establish a persistent connection with server. In theory, a server had better establish only one persistent connection. However, the multi-process model will result in n times (n = number of Worker processes) connections being created.

+--------+   +--------+
| Client | | Client | ... n
+--------+ +--------+
| \ / |
| \ / | n * m links
| / \ |
| / \ |
+--------+ +--------+
| Server | | Server | ... m
+--------+ +--------+

In order to reuse persistent connections as much as possible (because they are very valuable resources for server), we put it into the Agent process to maintain, and then we transmit data to each Worker via messenger. It's feasible but often we need to write a lot of code to encapsulate the interface and to realize data transmission, which is very troublesome.

In addition, it's relatively inefficient to transmit data via messenger, since messenger will do the transmission through the Master; In case IPC channel goes wrong, it would probably break Master process down.

Then is there any better way? The answer is yes. We provide a new type of model to reduce the complexity of this type of client encapsulation. The new Model bypasses the Master by establishing a direct socket between Agent and Worker. And as an external interface, Agent maintains shared connection among multiple Worker processes.

# core idea

Under the new mode, the client's communication is as follows:

             +-------+
| start |
+---+---+
|
+--------+---------+
__| port competition |__
win / +------------------+ \ lose
/ \
+---------------+ tcp conn +-------------------+
| Leader(Agent) |<---------------->| Follower(Worker1) |
+---------------+ +-------------------+
| \ tcp conn
| \
+--------+ +-------------------+
| Client | | Follower(Worker2) |
+--------+ +-------------------+

# Client interface type abstraction

We abstract the client interface into the following two broad categories, which is also a specification of the client interface. For clients that are in line with norms, we can automatically wrap it as Leader / Follower mode.

Client example

const Base = require('sdk-base');

class Client extends Base {
  constructor(options) {
    super(options);
    // remember to invoke ready after initialization is successful
    this.ready(true);
  }

  /**
   * Subscribe
   *
   * @param {Object} info - subscription information (a JSON object, try not to include attributes such as Function, Buffer, Date)
   * @param {Function} listener - monitoring callback function, receives a parameter as the result of monitoring
   */
  subscribe(info, listener) {
    // ...
  }

  /**
   * Publish
   *
   * @param {Object} info - publishing information, which is similar to that of subscribe described above
   */
  publish(info) {
    // ...
  }

  /**
   * Get data (invoke)
   *
   * @param {String} id - id
   * @return {Object} result
   */
  async getData(id) {
    // ...
  }
}

# exception handling

# protocol and time series to invoke

Leader and Follower exchange data via the following protocols:

0       1       2               4                                                              12
+-------+-------+---------------+---------------------------------------------------------------+
|version|req/res| reserved | request id |
+-------------------------------+-------------------------------+-------------------------------+
| timeout | connection object length | application object length |
+-------------------------------+---------------------------------------------------------------+
| conn object (JSON format) ... | app object |
+-----------------------------------------------------------+ |
| ... |
+-----------------------------------------------------------------------------------------------+
  1. On the communication port Leader starts a Local Server, via which all Leaders / Followers communicate.
  2. After Follower connects Local Server, it will firstly send a register channel packet (introduction of the channel concept is to distinguish between different types of clients)
  3. Local Server will assign Follower to a specified Leader (match based on client type).
  4. Follower sends requests to Leader to subscribe and publish.
  5. Leader notifies Follower through the subscribe result packet when the subscription data changes.
  6. Follower sends a call request to the Leader. The Leader executes a corresponding operation after receiving, and returns the result.
+----------+             +---------------+          +---------+
| Follower | | Local Server | | Leader |
+----------+ +---------------+ +---------+
| register channel | assign to |
+ -----------------------> | --------------------> |
| | |
| subscribe |
+ ------------------------------------------------> |
| publish |
+ ------------------------------------------------> |
| |
| subscribe result |
| <------------------------------------------------ +
| |
| invoke |
+ ------------------------------------------------> |
| invoke result |
| <------------------------------------------------ +
| |

# specific use

In the following I will use a simple example to introduce how to make a client support Leader / Follower mode in the framework.

// registry_client.js
const URL = require('url');
const Base = require('sdk-base');

class RegistryClient extends Base {
  constructor(options) {
    super({
      // Specify a method for asynchronous start
      initMethod: 'init',
    });
    this._options = options;
    this._registered = new Map();
  }

  /**
   * Start logic
   */
  async init() {
    this.ready(true);
  }

  /**
   * Get configuration
   * @param {String} dataId - the dataId
   * @return {Object} configuration
   */
  async getConfig(dataId) {
    return this._registered.get(dataId);
  }

  /**
   * Subscribe
   * @param {Object} reg
   * - {String} dataId - the dataId
   * @param {Function} listener - the listener
   */
  subscribe(reg, listener) {
    const key = reg.dataId;
    this.on(key, listener);

    const data = this._registered.get(key);
    if (data) {
      process.nextTick(() => listener(data));
    }
  }

  /**
   * publish
   * @param {Object} reg
   * - {String} dataId - the dataId
   * - {String} publishData - the publish data
   */
  publish(reg) {
    const key = reg.dataId;
    let changed = false;

    if (this._registered.has(key)) {
      const arr = this._registered.get(key);
      if (arr.indexOf(reg.publishData) === -1) {
        changed = true;
        arr.push(reg.publishData);
      }
    } else {
      changed = true;
      this._registered.set(key, [reg.publishData]);
    }
    if (changed) {
      this.emit(key, this._registered.get(key).map(url => URL.parse (url, true)));
    }
  }
}

module.exports = RegistryClient;
// agent.js
const RegistryClient = require('registry_client');

module.exports = agent => {
  // encapsulate and instantiate RegistryClient
agent.registryClient = agent.cluster(RegistryClient)
    // parameter of create method is the parameter of RegistryClient constructor
.create({});

agent.beforeStart(async () => {
await agent.registryClient.ready();
agent.coreLogger.info('registry client is ready');
});
};
// app.js
const RegistryClient = require('registry_client');

module.exports = app => {
app.registryClient = app.cluster(RegistryClient).create({});
app.beforeStart(async () => {
await app.registryClient.ready();
app.coreLogger.info('registry client is ready');

// invoke subscribe to subscribe
app.registryClient.subscribe({
dataId: 'demo.DemoService',
}, val => {
// ...
});

// invoke publish to publsih data
app.registryClient.publish({
dataId: 'demo.DemoService',
publishData: 'xxx',
});

// invoke getConfig interface
const res = await app.registryClient.getConfig('demo.DemoService');
console.log(res);
});
};

Is not it simple?

Of course, if your client is not so 『standard』, then you may need to use some other APIs, for example, your subscription function is not called subscribe, but sub.

class MockClient extends Base {
constructor(options) {
super({
initMethod: 'init',
});
this._options = options;
this._registered = new Map();
}

async init() {
this.ready(true);
}

sub(info, listener) {
const key = reg.dataId;
this.on(key, listener);

const data = this._registered.get(key);
if (data) {
process.nextTick(() => listener(data));
}
}

...
}

You need to set it manually with the delegate API

// agent.js
module.exports = agent => {
agent.mockClient = agent.cluster(MockClient)
// delegate sub to logic of subscribe
.delegate('sub', 'subscribe')
.create();

agent.beforeStart(async () => {
await agent.mockClient.ready();
});
};
// app.js
module.exports = app => {
app.mockClient = app.cluster(MockClient)
// delegate sub to subscribe logic
.delegate('sub', 'subscribe')
.create();

app.beforeStart(async () => {
await app.mockClient.ready();

app.sub({ id: 'test-id' }, val => {
// put your code here
});
});
};

We've already known that using cluster-client allows us to develop a 『pure』 RegistryClient without understanding the multi-process model. We can only focus on interacting with server, and use the cluster-client with a simple wrap to get a ClusterClient which supports multi-process model. The RegistryClient here is actually a DataClient that is specifically responsible for data communication with remote service.

You may have noticed that the ClusterClient brings with several constraints at the same time. If you want to expose the same approach to each process, RegistryClient can only support sub/pub mode and asynchronous API calls. Because all interactions in multi-process model must use socket communications, under which it is bound to bring this constraint.

Suppose we want to realize a synchronous get method. Put subscribed data directly into memory and use the get method to return data directly. How to achieve it? The real situation may be more complicated.

Here, we introduce an APIClient best practice. For modules that have requirements of synchronous API such as reading cached data, an APIClient is encapsulated base on RegistryClient to implement these APIs that are not related to interaction with the remote server. The APIClient instance is exposed to the user.

In APIClient internal implementation:

For example, add a synchronous get method with buffer in the APIClient module:

// some-client/index.js
const cluster = require('cluster-client');
const RegistryClient = require('./registry_client');

class APIClient extends Base {
constructor(options) {
super(options);

    // options.cluster is used to pass app.cluster to Egg's plugin
this._client = (options.cluster || cluster)(RegistryClient).create(options);
this._client.ready(() => this.ready(true));

this._cache = {};

// subMap:
// {
// foo: reg1,
// bar: reg2,
// }
const subMap = options.subMap;

for (const key in subMap) {
this.subscribe(subMap[key], value => {
this._cache[key] = value;
});
}
}

subscribe(reg, listener) {
this._client.subscribe(reg, listener);
}

publish(reg) {
this._client.publish(reg);
}

get(key) {
return this._cache[key];
}
}

// at last the module exposes this APIClient
module.exports = APIClient;

Then we can use this module like this:

// app.js || agent.js
const APIClient = require('some-client'); // the module above
module.exports = app => {
const config = app.config.apiClient;
app.apiClient = new APIClient(Object.assign({}, config, { cluster: app.cluster });
app.beforeStart(async () => {
await app.apiClient.ready();
});
};

// config.${env}.js
exports.apiClient = {
subMap: {
foo: {
id: '',
},
// bar...
}
};

To make it easy for you to encapsulate APIClient, we provide anAPIClientBase base class in the cluster-client module. Then APIClient above can be rewritten as:

const APIClientBase = require('cluster-client').APIClientBase;
const RegistryClient = require('./registry_client');

class APIClient extends APIClientBase {
  // return the original client class
get DataClient() {
return RegistryClient;
}

  // used to set the cluster-client related parameters, equivalent to the second parameter of the cluster method
get clusterOptions() {
return {
responseTimeout: 120 * 1000,
};
}

subscribe(reg, listener) {
this._client.subscribe(reg, listener);
}

publish(reg) {
this._client.publish(reg);
}

get(key) {
return this._cache[key];
}
}

in conclusion:

+------------------------------------------------+
| APIClient |
| +----------------------------------------|
| | ClusterClient |
| | +---------------------------------|
| | | RegistryClient |
+------------------------------------------------+

Students who are interested may have look at enhanced multi-process development model discussion process.

/**
* @property {Number} responseTimeout - response timeout, default is 60000
* @property {Transcode} [transcode]
* - {Function} encode - custom serialize method
* - {Function} decode - custom deserialize method
*/
config.clusterClient = {
responseTimeout: 60000,
};
Configuration Items Type Default Description
responseTimeout number 60000 (one minute) Global interprocess communication timeout, you cannot set too short, because the proxy interface itself has a timeout setting
transcode function N/A Serialization of interprocess communication, by default serialize-json (set up manually is not recommended)

The above is about global configuration. If you want to do a separate setting for a client:

app.registryClient = app.cluster(RegistryClient, {
responseTimeout: 120 * 1000, // the parameters passing here are related to cluster-client
}).create({
// here are parameters required by RegistryClient
});
const APIClientBase = require('cluster-client').APIClientBase;
const RegistryClient = require('./registry_client');

class APIClient extends APIClientBase {
get DataClient() {
return RegistryClient;
}

get clusterOptions() {
return {
responseTimeout: 120 * 1000,
};
}

// ...
}

module.exports = APIClient;