Concepts

Overview

The Plexus Interop project aims to define an open standard for desktop application interoperability and provides reference implementation of this standard. It formalizes connections between applications within a single user session on client desktop through a central broker.

The main goal is to enable development of extensible workflows connecting independent apps developed by different organizations in different technologies (.NET, Web, Java, Python, etc.) by passing relevant context (structured data objects) between those apps.

Although the industry is moving towards containerized desktop applications, Plexus Interop recognizes that much of the application landscape is not yet tied to containers and is architecturally agnostic of container implementation. Separating interoperability from the container provides notable advantages: different containers can be leveraged in the same workflow, and launched applications residing outside of containers can participate in interop activities.

Technically speaking, Plexus Interop is a metadata-centric language-agnostic desktop app-to-app interoperability framework with an extensible model for launching new instances of desktop applications on demand. Its extensibility is based on open standards which are essential parts of the project. The architecture is based around central broker providing hub-and-spoke connectivity between apps and brokering strongly-typed RPC-style calls between them. The broker has a connection to an Application Lifecycle Manager (or Launcher) which is capable of creating new instances of apps based on the their runtime-metadata (e.g. container type, launch command, command line parameters) defined in a registry.

Key components and high level architecture

Components

Plexus Interop consist of the following main API layers:

desktop interop layers

All Applications are connected to central Broker. To participate in interop each Application need to define itself in Interop Registry and auto-generate its specific client API using code-generation tool. Broker loads Interop Registry on startup and re-loads when it is changed in runtime to discover available actions, validate and route calls between Applications.

Broker

Broker is a process running on user desktop which handles application connections, routes calls between them and able to launch new applications instances on demand.

diag d48af40d9d064a753bde0a694854746c

Interop Broker has several responsibilities:

  • It reads full information about all interoperability APIs from Interop Registry

  • It routes interop API calls and replies between app instances

  • It tracks which application instances are alive. If some app calls a service which is not online yet then Broker launches service provider app via App Lifecycle Manager.

Clients

Clients are technology-specific libraries used by Applications to communicate with Broker. Currently Plexus Interop has 2 types of clients: for .NET and for TypeScript/JavaScript.

Clients communicate with Broker using Interop Protocol. Protocol messages are sent via Transport Protocol. Broker can support one or more Interop and Transport protocols and Client can choose which of them to use.

Interop DSL

Interop DSL is a language for defining contracts between Applications, and associated tooling for parsing, validating and code-generating of application-specific client APIs from these definitions.

Features

  • Communicate across desktop applications. Allows communication between apps running in different processes, developed by different teams, and in different programming languages.

  • Decouple desktop apps from each other and discover services dynamically. All interop calls are going through the central broker which works as a mediator decoupling apps from each other. Apps can discover each other in run time using different search criteria.

  • Launch apps on demand. Interop broker tracks the lifecycle of desktop applications, i.e. it knows whether app is running or not and can launch app when required.

  • Define strongly-typed interop APIs. Reference implementation uses popular Google Protobuf format for defining APIs and serializing messages. Strong-typing and using of industry-wide standard allows easy integration between different apps.

  • Store APIs in central repository, control their usage and evolution. All API definitions, as well as explicit per-app declaration of provided and consumed services, are supposed to be stored in the central interop repository. Central repository allows defining standards for communicating apps and common services on user desktop, granular control on usage and evolution of APIs.

  • Use different messaging patterns. Supports request-response RPC pattern for common cases, as well as broadcast and bi-directional streaming patterns for more complex scenarios.

  • Easy deploy and distribute into enterprise environments. Deployment is very simple, does not require administrative permissions, supports all major enterprise operating systems and terminal services.

  • Communicate securely using different transports. Uses local-only communication channels. Two transports are supported at the moment: "websockets" transport best suited for web apps and "named pipes" best suited for desktop apps.

  • Extend where required. Can be adjusted to your needs via extension points. It’s possible to introduce new interop transport and serialization protocols, support custom authentication and authorization, different app startup flows and new programming languages.

See Guides section for the more detailed description of what is Plexus Interop and how it works.

Definitions

This section contains overview diagram which shows Plexus Interop layers on example of 2 applications interoperating through broker. Below this diagram you can find brief descriptions for each of the presented concepts.

desktop interop concepts
diag e24370232956d904dd9be81530041a27

Application

Application is a program on client desktop which can be started and stopped. Each application can provide (i.e. implement) services and/or consume (i.e. call) services provided by other applications.

Application is not necessary correspond to OS process. It can be a part of process, or consist of many processes.

Examples of applications:

  • A program with GUI

  • A program without GUI, e.g. Windows Service

  • A part of process, e.g. a plugin running in a big container application consisting of many isolated plugins

  • A web window or worker running in Electron.js or other web container

Application Instance

Application instance is a running (started) application. There can be many instances of the same application running simultaneously on client desktop.

Interop Registry

Interop registry defines strong-typed contracts between applications on client desktop using a special human-readable interface definition language.

It consist of definitions of the following types:

  • Message definitions represent types of data which can be sent between applications.

  • Service definitions represent named sets of related methods which can be implemented by applications. Service can contain one or many methods and each method is defined by its name, type, and types of request and response.

  • Application definitions declare which service implementations are provided and which consumed by each application participating in desktop interop.

Broker loads Interop Registry from server on launch and re-loads it on changes. This allows adding new services to user desktops in runtime.

Interop Registry enables dynamic service discovery. Applications are only connected to broker and call services rather than other applications. This concept is similar to dynamic injection containers available in almost any language and framework, but here it is used on a higher level. Broker dynamically searches for a suitable service implementation across all the applications in Interop Registry. Because of this applications are always de-coupled from each other, they don’t need to know about counter-parties implementation details and they can release separately.

Broker

Broker is the central part of desktop interop. Broker accepts, authenticates and tracks application (client) connections, validates and routes calls between them, and optionally can trigger launch of application on demand through App Lifecycle Manager.

Broker listens to one or many wire protocol servers to accept client connections. Applications can use any of the listened wire protocol implementations to establish connection with broker.

Application instance is considered "online" while it has at least one established connection with broker. Broker uniquely identifies each connection and tracks it, so it always knows which apps are online in any given time.

Online applications instances can send protocol requests to broker to discover or call service implementations provided by other applications. Broker validates and routes such calls to target applications according to interop registry.

Application Lifecycle Manager

Component inside Broker which tracks active connections and can trigger launch of an application on demand by sending a request to the appropriate Application Launcher.

Application Launcher

Application launchers are applications which can launch other applications. This concept was introduced to allow easy implementation of custom launch workflows which can be very different. E.g. web application is loaded in a browser window, native application started as a new OS process and plugin is loaded by its container application. In addition each vendor usually has custom launch steps like auto-updating or accepting terms & conditions. Such steps can also be implemented in a vendor-specific app launcher.

On method call broker can either route the call to an online application instance or trigger launch of a new instance. There are several strategies of routing launch requests from Broker to App Launcher, called Launch Behaviors:

  • Default - Pass invocation to already running provider instance or launch new one if no online providers found.

  • Always - Create new app instance on each invocation.

  • Never - Pass invocation to already running instance, return error if no online providers found.

Launch Behaviors can be specified per action in Interop Registry, please see example below:

application RateProviderApp {
    provides CcyPairBroadcastService {
        option launch_on_call = ALWAYS;
        GetRate;
    }
}

Each application can specify its Launcher in Interop Registry and broker will delegate launch to the specified Launcher.

Client

Client is a library used by application to communicate with interop broker. Client can be implemented in any language, it communicates with broker using interop protocol, transport protocol and wire protocol.

Method

Like many other Remote Procedure Call (RPC) systems, Plexus Interop is based around the idea of defining a service by specifying the methods that can be called remotely with their parameters and return types.

There are 4 possible types of RPC methods in Plexus Interop mirroring types supported by gRPC:

  • Unary - consumer send a request, provider receives it, handles, and sends response back to the consumer.

  • Consumer streaming - consumer sends one or many requests, provider sends only one response back.

  • Provider streaming - consumer sends one request, provider sends many responses back.

  • Bidirectional streaming - consumer sends many requests, provider sends many responses.

Discovery

Ability to dynamically discover service implementations. It allows using services in a decoupled way when consumer might not even know where the called service is implemented.

Step one: find API implementations:

diag c1232bd1e1db9ff2851b63035ddd135d

Step two: select between API implementations

Step three: call specific API implementer:

diag 52f1bb1b83534f00e415763f59166d7b

Client Message Marshaller

Pluggable component used by client library to serialize and deserialize request/response messages.

Generic Client API

Technology-specific API implementation which provides core functionality of interop client. It converts method calls to interop protocol messages and vice versa according to specification and uses pluggable transport implementation to send/receive messages to/from broker.

Currently .NET and TypeScript/JavaScript libraries are provided.

Generated Client API

Application-specific wrapper on top of Generic Client API which is auto-generated based on Interop Registry by code-generation tool.

For each consumed service a proxy class is generated which allows invocation of service methods.

Executing action using generated proxy
const rate: ICcyPairRate = client.getCcyRateService().getRate({ ccyPairName: 'EURUSD' });

For each provided service an interface is generated which should be implemented to provide implementations for service methods.

Registering handler for provided service
clientBuilder.withCcyRateServiceProvider({
    onGetRate: (invocattionContext: InvocationContext, pair: ICcyPair) => {
        return internalService.getRate(pair);
    }
});

Interop Protocol

Interop Protocol is a language for conversations between client and broker through transport channels.

Each interop protocol conversation is performed within a separate transport channel.

Protocol describes 4 main types of conversations:

  • Connect handshake - the first conversation performed after new transport connection is established

  • Disconnect handshake - the last conversation performed before transport connection is closed

  • Discovery - request from application to broker to get the available services filtered by specific parameters

  • Method call - request from an application to call a method implemented in another application

Transport Protocol

Transport protocol is used to send binary messages between client and broker and is responsible for multiplexing and de-multiplexing many channels through a single connection.

Transport Connection

Transport connection is an abstraction of established connectivity between client and broker.

Connection is initiated by client and accepted by broker.

Transport implementation must detect if the connection is still alive, e.g. by sending heartbeats or using lower-level wire protocol capabilities for this, and raise "disconnected" event on both sides when connectivity is lost.

Transport Channel

Transport channel is a logical unit of data exchange through connection. It consist of request and response byte streams on both sides of connection.

Channel opening can be initiated by both client and broker. It’s only possible to write and read bytes to/from an opened channel. Many channels can be simultaneously opened in the context of the same connection.

For example, for each call from one application to another, 2 transport channels are opened. First is opened by source application to broker. Second is opened by broker to target application. All the data sent in context of the call is transferred through these 2 channels.

Bytes written to request stream on one side of channel can be read in exactly the same order from response stream on another side. As soon as one of the sides sent all the data it closes request stream. This triggers response stream completion event on another side as soon as all the sent bytes consumed.

Channel considered "Completed" when both sides completed request stream and consumed all the bytes from response stream. Additionally it can be terminated by either client or broker with either "Failed" or "Canceled" status in case of exception.

Wire Protocol

Wire protocol is an abstraction for sending bytes through cross-process boundaries. Any existing stream-based network protocol such as named pipes or websockets can be used as a wire protocol for Plexus Interop.

Broker listens to many wire protocols simultaneously on different addresses, so each client can choose which one to use. Usually for native apps it’s more convenient to use named pipes, but for web apps it’s more convenient to use websockets, because most of browsers has built-in websockets support.

Wire Connection

Wire connection is an abstraction of established cross-process connection.

Getting Started

Repository structure

Plexus Interop repository consist of the following main sections:

How to build

To build Plexus Interop components on your machine clone the repository and run Gradle build command:

git clone https://github.com/finos-plexus/plexus-interop.git
cd plexus-interop
gradlew build --console plain --no-daemon

Build artifacts are saved into bin directory :

  • win-x86/broker - binaries of interop broker

  • win-x86/samples - sample apps

  • win-x86/sdk - code generator tool

  • docs - documentation in HTML format

  • nuget - .NET NuGet packages to be used in apps

Sample apps

Repository contains a sample demonstrating how Plexus Interop can be used to build interoperability between apps on desktop.

The sample consist of .NET Core console app and web app running in Electron. As an example let us test how request-response call and discovery work:

  1. Build samples - see How to build for details

  2. Go to bin/win-x86/samples/greeting directory

  3. Launch Interop Broker – LaunchBroker.cmd

  4. Launch sample "Greeting Client" app – LaunchGreetingClient.cmd

  5. Choose “Discovery” option (5) first and then “Greeting from Electron Web app” option from discovery response:

    Sample-1
  6. Enter some name (e.g. “John”) and press Enter. Broker will launch "Web Greeting Server" app, which is configured to run in Electron container. Once launched the app will show the information about connection and incoming request and will send back a greeting response:

    Sample-2
  7. "Greeting Client" app should receive the greeting and print it:

    Sample-3
  8. Now choose Discovery (5) option in "Greeting Client" again and this time select “Greeting from .Net app” option from discovery results.

  9. Enter another name, e.g. “Mike” and press enter. Broker will start a ".NET Greeting Server"console app. Once running this app will print information about the incoming request and will send back the greeting:

    Sample-4

You may check other call types by choosing other options in the "Greeting Client" app.

Source code for sample flow is here:

Tutorial

In this section we will be going through the case of two simple business apps communicating with each other via Plexus Interop:

  • CcyPairRateProvider - an app developed by Vendor A, which provides FX currency pair rates.

  • CcyPairRateViewer - an app developed by Vendor B, which allows user to request FX currency pair rates from other apps and see them.

Two most popular cases for interoperability are web and .NET, so the guide is split into two independent sections:

Both guides describe the same flow/API, so if you want to experiment with interop between different technologies - just combine the samples, e.g. take web implementation of CcyPairRateViewer from the first guide and .NET implementation of CcyPairRateProvider from the second guide and run them together.

Prerequisites

You will need the following software:

You also need to build Plexus Interop components if you have not done it yet. See How to build for details.

Quick Start - Web

In this guide both consumer and provider are written in TypeScript language and run in Electron container.

Run example
  1. Switch to the bin\win-x86\quick-start directory and start a new broker instance:

    cd bin\win-x86\samples\quick-start
    start plexus broker

    Broker requires two types of metadata to run:

    • app registry - declares all available apps and how to launch them. App Lifecycle Manager component uses this registry to launch apps when requested.

    • interop registry - declares messages, services, and which of them are provided or consumed by each application. Broker uses this information to provide discovery capabilities and validate invocations between apps.

    By default Interop Broker reads metadata from apps.json and interop.json files located in metadata sub-directory within broker’s working directory. In real world scenarios, broker will load registries from remote server instead of local files, because remote server may provide many capabilities like (admin UI, central governance, audit trail, entitlements, etc.). To load metadata from other data sources you need to implement your own registry provider (documentation on that is in progress).

    Each Interop Broker instance is identified by its working directory. It is possible to run many brokers in parallel, but instance must run in a separate directory.

  2. Launch WebCcyPairRateViewer app:

    plexus launch vendor_b.fx.WebCcyPairRateViewer

    plexus launch command sends a request to App Lifecycle Manager to make some app up and running. App Lifecycle Manager is a part of the broker that tracks which apps are alive and can create a new instance of an app if there is no live one. Different app types may have very different life cycles, so App Lifecycle Manager launches new app instances and tracks their status using App Launchers.

  3. App window will open. In this window you should see "Connected to Broker" message.

  4. Enter a currency pair name (e.g. "EURUSD") and click "Get Rate" button:

    Quick Start Web 1

    When app A calls an interop service hosted in app B the following steps happen:

    • Broker searches for the already connected live service providers

    • If service provider is offline, broker inspects App Registry and sends a launch request to App Lifecycle Manager.

    • While app is starting broker waits (with some timeout)

    • Service provider within the launched app connects to the broker and registers itself

    • Broker sends interop message to the service provider

    See Invocation Workflow section for details.

    This guide uses a sample ElectronAppLauncher. To understand how to build custom launchers see How to write custom app launcher.

  5. Broker starts "CCY Rate Provider" app and forwards request to it. App shows information about the request and sends back a random value.

    Quick Start Web 2
  6. The "CCY Rate Viewer" app shows the rate received from provider:

    Quick Start Web 3
Modify example

Let us add real-time updates of FX rates instead of a single response. For this we have to change the contract between two apps, provide updated interop metadata to the broker, update apps and run them again.

Modify metadata

First we need to modify interop registry and pass updated metadata to the broker.

  1. Open registry/fx/ccy_pair_rate_service.proto file and add a new GetRateStream method:

    /registry/fx/ccy_pair_rate_service.proto
    service CcyPairRateService {
        rpc GetRate(CcyPair) returns (CcyPairRate);
    
        // new rpc call returning real-time notification stream
        rpc GetRateStream(CcyPair) returns (stream CcyPairRate);
    }
  2. Open registry/vendor_a/fx/web_ccy_pair_rate_provider.interop file and change provided method to GetRateStream:

    /registry/vendor_a/fx/web_ccy_pair_rate_provider.interop
    package vendor_a.fx;
    
    import "fx/ccy_pair_rate_service.proto";
    
    application WebCcyPairRateProvider {
        provides .fx.CcyPairRateService {
            GetRateStream [title = "Web Provider - Get Rate Stream"];
        }
    }
  3. Open registry/vendor_b/fx/web_ccy_pair_rate_viewer.interop file and change consumed method to GetRateStream:

    /registry/vendor_b/fx/web_ccy_pair_rate_viewer.interop
    package vendor_b.fx;
    
    import "fx/ccy_pair_rate_service.proto";
    
    application WebCcyPairRateViewer {
        consumes .fx.CcyPairRateService { GetRateStream; }
    }
  4. To pass the changed metadata to broker, navigate back to bin\win-x86\samples\quick-start and re-generate metadata using plexusgen tool:

    java -jar ../../sdk/plexusgen.jar --type=json_meta --baseDir=registry --out=metadata

    Broker tracks interop.json file and automatically reloads metadata when the file is changed.

Modify and build provider app

We changed provided service definition, so now we need to re-generate provider’s client and provide implementation for new method:

  1. From project root call Plexus Generator tool to re-generate provider’s client code:

    java -jar bin/win-x86/sdk/plexusgen.jar --type=ts --baseDir=./bin/win-x86/samples/quick-start/registry --input=web_ccy_pair_rate_provider.interop --out=./web/packages/ccy-pair-rate-provider/src/gen --protoc=./web/node_modules/.bin/pbts.cmd

    Plexus Generator tool will generate interfaces and proxy classes from service descriptions. It uses ProtobufJs to generate message definitions from .proto files.

  2. Open web/packages/ccy-pair-rate-provider/src/index.ts file and change onGetRate implementation to onGetRateStream:

    new WebCcyPairRateProviderClientBuilder()
        .withClientDetails({
            applicationId: "vendor_a.fx.WebCcyPairRateProvider",
            applicationInstanceId: instanceId
        })
        .withTransportConnectionProvider(() => new WebSocketConnectionFactory(new WebSocket(webSocketUrl)).connect())
        .withCcyPairRateServiceInvocationsHandler({
            onGetRateStream: async (invocationContext, ccyPair, hostClient) => {
                log(`Received Streaming request for ${ccyPair.ccyPairName} Rate`);
                // send several rates and complete the invocation
                let count = 5;
                const sendResponse = () => {
                    if (count) {
                        count--;
                        hostClient.next(rateService.getRate(ccyPair.ccyPairName));
                        setTimeout(sendResponse, 1000);
                    } else {
                        hostClient.complete();
                    }
                };
                sendResponse();
            }
        })
        .connect()
        .then(() => log("Connected to Broker"))
        .catch(e => console.error("Connection failure", e));

    In this example provider will send five rates and then will close the stream.

  3. Navigate to web/packages/ccy-pair-rate-provider directory and run npm run build to re-build the app.

Modify and build consumer app

Because interop method was changed we also need to re-generate consumer’s client code and modify how response is handled:

  1. From project root call Plexus Generator tools to re-generate consumer’s client code:

    java -jar bin/win-x86/sdk/plexusgen.jar --type=ts --baseDir=./bin/win-x86/samples/quick-start/registry --input=web_ccy_pair_rate_viewer.interop --out=./web/packages/ccy-pair-rate-viewer/src/gen --protoc=./web/node_modules/.bin/pbts.cmd
  2. Open web/packages/ccy-pair-rate-viewer/src/index.ts and update code to invoke streaming getRateStream action:

    new WebCcyPairRateViewerClientBuilder()
        .withClientDetails({
            applicationId: "vendor_b.fx.WebCcyPairRateViewer",
            applicationInstanceId: instanceId
        })
        .withTransportConnectionProvider(
            () => new WebSocketConnectionFactory(new WebSocket(webSocketUrl)).connect()
        )
        .connect()
        .then(async (rateViewerClient: WebCcyPairRateViewerClient) => {
            log("Connected to Broker");
            window.getRate = async () => {
                const ccyPair = (document.getElementById("ccyPair") as HTMLInputElement).value;
                log(`Sending request for ${ccyPair}`);
                rateViewerClient.getCcyPairRateServiceProxy()
                    .getRateStream({ccyPairName: ccyPair}, {
                        next: ccyRate => {
                            log(`Received rate ${ccyRate.ccyPairName} - ${ccyRate.rate}`);
                        },
                        complete: () => log("Completed"),
                        error: () => log("Error received")
                    });
            };
        });
  3. Navigate to web/packages/ccy-pair-rate-viewer and run npm run build to re-build the app.

Launch the modified example
  1. Navigate to bin\win-x86\samples\quick-start

  2. Launch the viewer app:

    plexus launch vendor_b.fx.WebCcyPairRateViewer
  3. In the opened window enter a currency pair name, e.g. EURUSD and press Enter:

    Quick Start Web 6
  4. Broker starts the provider app and redirects request to it:

    Quick Start Web 5
  5. The viewer app shows five rates received from provider and a message that reply stream was closed:

    Quick Start Web 4

Quick Start - .NET

In this guide both consumer and provider are .NET console apps written in C# language running under .NET Core.

Run example
  1. Switch to the bin\win-x86\quick-start directory and start a new broker instance:

    cd bin\win-x86\samples\quick-start
    start plexus broker

    Broker requires two types of metadata to run:

    • app registry - declares all available apps and how to launch them. App Lifecycle Manager component uses this registry to launch apps when requested.

    • interop registry - declares messages, services, and which of them are provided or consumed by each application. Broker uses this information to provide discovery capabilities and validate invocations between apps.

    By default Interop Broker reads metadata from apps.json and interop.json files located in metadata sub-directory within broker’s working directory. In real world scenarios, broker will load registries from remote server instead of local files, because remote server may provide many capabilities like (admin UI, central governance, audit trail, entitlements, etc.). To load metadata from other data sources you need to implement your own registry provider (documentation on that is in progress).

    Each Interop Broker instance is identified by its working directory. It is possible to run many brokers in parallel, but instance must run in a separate directory.

  2. Launch "CCY Pair Viewer" app:

    plexus launch vendor_b.fx.CcyPairRateViewer

    plexus launch command sends a request to App Lifecycle Manager to make some app up and running. App Lifecycle Manager is a part of the broker that tracks which apps are alive and can create a new instance of an app if there is no live one. Different app types may have very different life cycles, so App Lifecycle Manager launches new app instances and tracks their status using App Launchers.

    In this example apps are launched using built-in NativeAppLauncher which run apps as standard OS processes.

  3. In the opened console window enter a currency pair name, e.g. EURUSD and press Enter:

    Quick Start .NET 1

    When app A calls an interop service hosted in app B the following steps happen:

    • Broker searches for the already connected live service providers

    • If service provider is offline, broker inspects App Registry and sends a launch request to App Lifecycle Manager.

    • While app is starting broker waits (with some timeout)

    • Service provider within the launched app connects to the broker and registers itself

    • Broker sends interop message to the service provider

    See Invocation Workflow section for details.

    Note that CcyPairRateProvider app uses the same NativeAppLauncher as the viewer app.

  4. Once CcyPairRateProvider app is up and running it registers in Broker, receives the message from viewer app and sends back a random rate:

    Quick Start .NET 2
  5. The viewer app shows the rate received from provider:

    Quick Start .NET 3
Modify example

Let us add real-time updates of FX rates instead of a single response. For this we have to change the contract between two apps, provide updated interop metadata to the broker, update apps and run them again.

Modify metadata

First we need to modify interop registry and pass updated metadata to the broker.

  1. Open registry/fx/ccy_pair_rate_service.proto file and add new GetRateStream method:

    /registry/fx/ccy_pair_rate_service.proto
    service CcyPairRateService {
        rpc GetRate(CcyPair) returns (CcyPairRate);
    
        // new rpc call returning real-time notification stream
        rpc GetRateStream(CcyPair) returns (stream CcyPairRate);
    }
  2. Open registry/vendor_a/fx/ccy_pair_rate_provider.interop file and change provided method to GetRateStream:

    /registry/vendor_a/fx/ccy_pair_rate_provider.interop
    package vendor_a.fx;
    
    import "fx/ccy_pair_rate_service.proto";
    
    application CcyPairRateProvider {
        provides .fx.CcyPairRateService {
            GetRateStream [title = ".NET Provider - Get Rate Stream"];
        }
    }
  3. Open registry/vendor_b/fx/ccy_pair_rate_viewer.interop file and change consumed method to GetRateStream:

    /registry/vendor_b/fx/ccy_pair_rate_viewer.interop
    package vendor_b.fx;
    
    import "fx/ccy_pair_rate_service.proto";
    
    application CcyPairRateViewer {
        consumes .fx.CcyPairRateService { GetRateStream; }
    }
  4. To pass updated metadata to the broker, go back to bin\win-x86\samples\quick-start directory and re-generate metadata using Plexus Generator tool:

    java -jar ../../sdk/plexusgen.jar --type=json_meta --baseDir=registry --out=metadata

    Broker tracks interop.json file and automatically reloads metadata file is modified.

Modify and build provider app

Now let us update CcyPairRateProvider app to provide real-time notifications.

  1. Navigate to the desktop/src/Plexus.Interop.Samples.CcyPairRateProvider directory and call Generate.cmd

    Plexus Generator tool generates interfaces and proxy classes from service descriptions. Internally it uses protobuf compiler to generate C# message definitions from .proto files.

  2. Open Program.cs file and update the code to provide implementation of the new method:

    /desktop/src/Plexus.Interop.Samples.CcyPairRateProvider/Program.cs
    namespace Plexus.Interop.Samples.CcyPairRateProvider
    {
        using Plexus.Interop.Samples.CcyPairRateProvider.Generated;
        using System;
        using System.IO;
        using System.Threading.Tasks;
        using Plexus.Channels;
    
        public sealed class Program : CcyPairRateProviderClient.ICcyPairRateServiceImpl
        {
            private readonly Random _random = new Random();
    
            public static void Main(string[] args)
            {
                new Program().MainAsync(args).GetAwaiter().GetResult();
            }
    
            public async Task MainAsync(string[] args)
            {
                // Read broker working dir specified either in the first
                // command line argument or in environment variable,
                // or just use current working directory.
                var brokerWorkingDir = args.Length > 0
                    ? args[0]
                    : EnvironmentHelper.GetBrokerWorkingDir() ?? Directory.GetCurrentDirectory();
    
                // Creating client and connecting to broker
                Console.WriteLine("Connecting to broker {0}", brokerWorkingDir);
                var client = new CcyPairRateProviderClient(this, setup => setup.WithBrokerWorkingDir(brokerWorkingDir));
                await client.ConnectAsync();
                Console.WriteLine("Connected. Waiting for requests. Press CTRL+C to disconnect.");
                Console.CancelKeyPress += (sender, eventArgs) =>
                {
                    eventArgs.Cancel = true;
                    client.Disconnect();
                };
    
                // Awaiting completion
                await client.Completion;
                Console.WriteLine("Disconnected.");
            }
    
            // Implementation of server streaming method GetRateStream
            public async Task GetRateStream(
                CcyPair request,
                IWritableChannel<CcyPairRate> responseStream,
                MethodCallContext context)
            {
                Console.WriteLine("Received subscription: {0}", request);
                try
                {
                    do
                    {
                        var response = GetCcyPairRate(request);
                        Console.WriteLine("Sending response: {0}", response);
                        await responseStream.TryWriteAsync(response, context.CancellationToken);
                        await Task.Delay(_random.Next(1000, 3000), context.CancellationToken);
                    } while (!context.CancellationToken.IsCancellationRequested);
                }
                catch (OperationCanceledException) when (context.CancellationToken.IsCancellationRequested)
                {
                    // Ignoring cancellation exception
                }
                Console.WriteLine("Subscription completed");
            }
    
            private CcyPairRate GetCcyPairRate(CcyPair request)
            {
                CcyPairRate response;
                switch (request.CcyPairName)
                {
                    case "EURUSD":
                        response = new CcyPairRate
                        {
                            CcyPairName = "EURUSD",
                            Rate = 1.15 + 0.05 * _random.NextDouble()
                        };
                        break;
                    case "EURGBP":
                        response = new CcyPairRate
                        {
                            CcyPairName = "EURGBP",
                            Rate = 0.87 + 0.05 * _random.NextDouble()
                        };
                        break;
                    default:
                        throw new ArgumentOutOfRangeException($"Unknown currency pair: {request.CcyPairName}");
                }
    
                return response;
            }
        }
    }
  3. Re-build the app by executing the following command:

dotnet build
Modify and build consumer app

Now let us update "CCY Pair Rate Viewer" app to handle multiple incoming real-time updates.

  1. Navigate to the desktop/src/Plexus.Interop.Samples.CcyPairRateViewer directory and call Generate.cmd

  2. Navigate to the desktop/src/Plexus.Interop.Samples.CcyPairRateViewer directory, open Program.cs file and update the code to the following:

    /desktop/src/Plexus.Interop.Samples.CcyPairRateViewer/Program.cs
    namespace Plexus.Interop.Samples.CcyPairRateViewer
    {
        using Plexus.Interop.Samples.CcyPairRateViewer.Generated;
        using System;
        using System.IO;
        using System.Threading.Tasks;
    
        public sealed class Program
        {
            public static void Main(string[] args)
            {
                new Program().MainAsync(args).GetAwaiter().GetResult();
            }
    
            public async Task MainAsync(string[] args)
            {
                // Read broker working dir specified either in the first
                // command line argument or in environment variable,
                // or just use current working directory.
                var brokerWorkingDir = args.Length > 0
                    ? args[0]
                    : EnvironmentHelper.GetBrokerWorkingDir() ?? Directory.GetCurrentDirectory();
    
                // Creating client and connecting to broker
                Console.WriteLine("Connecting to broker {0}", brokerWorkingDir);
                var client = new CcyPairRateViewerClient(setup => setup.WithBrokerWorkingDir(brokerWorkingDir));
                await client.ConnectAsync();
                Console.WriteLine("Connected");
    
                while (true)
                {
                    Console.Write("Enter currency pair (e.g. \"EURUSD\") or press Enter to exit: ");
                    var ccyPairName = Console.ReadLine();
                    if (string.IsNullOrEmpty(ccyPairName))
                    {
                        break;
                    }
    
                    // Requesting ccy pair rate from another app
                    var request = new CcyPair { CcyPairName = ccyPairName };
                    var call = client.CcyPairRateService.GetRateStream(request);
    
                    ConsoleCancelEventHandler cancelHandler = null;
                    cancelHandler = (sender, eventArgs) =>
                    {
                        Console.CancelKeyPress -= cancelHandler;
                        Console.WriteLine("Unsubscribing");
                        eventArgs.Cancel = true;
                        call.Cancel();
                    };
    
                    Console.CancelKeyPress += cancelHandler;
    
                    await call.ResponseStream
                        .ConsumeAsync(item =>
                        {
                            Console.WriteLine("Notification received: {0}", (object)item);
                            Console.WriteLine("Press CTRL+C to unscubscribe");
                        })
                        .IgnoreAnyCancellation();
                }
    
                Console.WriteLine("Disconnecting");
                await client.DisconnectAsync();
                Console.WriteLine("Disconnected");
            }
        }
    }
  3. Re-build the app by executing the following command:

dotnet build
Launch the modified example
  1. Launch "CCY Pair Rate Viewer":

    plexus launch vendor_b.fx.CcyPairRateViewer
  2. In the opened console window enter a currency pair name, e.g. EURUSD and press Enter:

    Quick Start .NET 4
  3. Broker starts "CCY Pair Rate Provider" app and redirects request to it:

    Quick Start .NET 5
  4. The viewer app will show rate updates coming from provider until you press CTRL-C:

    Quick Start .NET 6

Guides

Tools

This section describes available tools which simplify development and testing process.

Plexus Command Line Interface (CLI)

Plexus Command Line Interface used for metadata validation and generating the strong-typed client code in different languages.

Installation

Plexus CLI tool can be installed using following command:

npm i @plexus-interop/cli --global

If you work in restricted environment, then please consider adjusting following environment properties before installation:

Environment variable name Details

PLEXUS_JRE_DOWNLOAD_URL

Custom JRE package download URL

PLEXUS_CLI_JAVA_EXE_PATH

Full path to java executable

PLEXUS_PROTOC_DOWNLOAD_URL

Custom Proto Compiler package download url

PLEXUS_CLI_PROTOC_EXE_PATH

Full path to protoc executable

Usage

To list all available commands please use:

plexus --help

it will print all commands with their descriptions:

  Usage: plexus [options] [command]

  Options:

    -V, --version             output the version number
    -h, --help                output usage information

  Commands:

    gen-ts [options]          generate Typescript client and messages definitions for specified entry point
    gen-json-meta [options]   generate metadata in JSON format
    gen-proto [options]       generate proto definitions for specified entry point
    gen-csharp [options]      generate C# client and messages definitions for specified entry point
    validate [options]        validate metadata
    validate-patch [options]  validate metadata update

  Help for specific command:  plexus [command] --help

to get help for specific command please use:

plexus [command] --help

Plexus Studio

Plexus Studio is a development tool to explore and test Plexus Interop APIs defined in metadata. It can connect to Plexus Interop on behalf of any available application, handles invocations and executes any consumed action.

Invoke action workflow

Let’s take a look at simple workflow of simulating action invocation. It provides an ability to test your Provider application without installing/getting any information about possible Consumer applications. To invoke action please follow steps below:

  1. Find required application in the list and click it to connect on behalf of this Application:

    Plexus Studio Apps
  2. Choose required Action from Consumed Services section

    Plexus Studio Consumer
  3. On Consumed Action screen you can:

    • Specify request payload in JSON format

    • Discover available action providers

    • Choose target application

    • Send request and get response details

    Plexus Studio Consumed action

Invocation workflow

This section presents the most interesting scenario of invocation, which includes app launch:

diag 94484b5c9cb52f30160f27f639a75cb6
  • When Desktop Plexus is started it creates transport endpoints and start listen for incoming connections.

  • Plexus Interop launches app A and provides it with the configuration on how to connect to transport servers.

  • App A initializes Client APIs and connects to Broker. Client and Broker performs handshake after which Broker registers the connected instance start tracking it.

Now user wants to send some information from app A into another app B available in Plexus. Here is what happens in app A:

  • App A creates an input message instance using builder provided by Client API library

  • App A calls service method in Client API and passes created message instance

  • Client API performs several steps:

    • Creates header which contains information about the invocation: sender, receiver, service name, method name etc.

    • Serializes the created header via Protobuf and submits into Interop Transport for sending

    • Serializes the created input message into binary stream using Protobuf and submits into Interop Transport for sending

    • Transport splits the incoming message stream into one or more frames and send them to the pipe

Plexus Host process is doing the following:

  • Interop Transport reads binary stream from transport and re-constructs headers and messages from received frames.

  • When new header arrives Transport starts to build new message blob from frames related to the message

  • Once header and message blobs are constructed, Transport passes them into Interop Broker

  • Interop Broker deserializes header blob and checks information about message receiver

  • If target app is not running then Interop Broker sends "launch app" event to App Launcher with the ID of app B and waits

  • App Launcher starts target app B. After launch app B will:

    • Register callback for process API calls

    • Register its instance in Interop Broker (same as app A)

  • Interop Broker detects that app B is live and sends message into it using Transport

Now that message is sent to app B via transport remaining part is easy:

  • Interop Transport within app B restores message blob and passes it into Client API

  • Client API deserializes blob into strongly typed message and invokes callback registered by app B code

  • App B processes the message and sends reply using same approach as original request, but in reverse direction

Plexus Interop integration workflow

Typical development cycle for integration with Plexus Interop:

diag 5d068d7005ef3b3fabcfa81d9515e4bd
  1. Clone existing interop API definitions (metadata) from remote central registry

  2. Define new interop metadata locally

  3. Generate client interop APIs from local metadata

  4. Integrate generated interop APIs into the app

  5. Publish new metadata into local "sandbox" registry

  6. Test interop flow via Plexus Studio:

    1. If app provides service then you can call it from Plexus Studio.

    2. If app consumes some service then you can check that app sends correct interop messages into Plexus Studio.

    3. Repeat steps 2-5 until you are fully happy with API and its implementation.

  7. Deploy new version of the app with new interop flow

  8. Publish interop metadata into central registry for approval

  9. Once new metadata is approved it should get into central Interop Registry and become available to all users

  10. Test flow between apps end-to-end in real environment

How to write custom App Launcher

App Launchers are just applications which implements the special contract called interop.AppLauncherService.

To implement custom app launcher need to do the following:

  1. Define new application in interop registry that provides service interop.AppLauncherService service to interop.AppLifecycleManager. See example below.

  2. Generate client API for the application.

  3. Implement methods defined by interop.AppLauncherService.

  4. Register the new app in App Registry. Use built-in interop.NativeAppLauncher as a launcher for it. See example below.

  5. Use environment variables to get broker location and instance id for connection.

How does this work? If app launcher is not started yet, Interop Broker will start it and pass environment variables "PLEXUS_APP_INSTANCE_ID" and "PLEXUS_BROKER_WORKING_DIR". First variable should be used in connection message 'AppInstanceId' property to let broker know the connecting instance is the one it is waiting for. The second variable defines broker location. Transports should check the directory "%PLEXUS_BROKER_WORKING_DIR%\servers\<servername>" to get connectivity details. For example, websocket server connection URL is specified in "%PLEXUS_BROKER_WORKING_DIR%\servers\ws-v1\address".

Example definition of ElectronAppLauncher

AppLauncherService.proto
syntax = "proto3";

package interop;

import "interop/UniqueId.proto";

service AppLauncherService {
        rpc Launch(AppLaunchRequest) returns (AppLaunchResponse);
}

message AppLaunchRequest {
        string app_id = 1;
        string launch_params_json = 2;
}

message AppLaunchResponse {
        UniqueId app_instance_id = 1;
}
ElectronAppLauncher.interop
import "interop/AppLauncherService.proto";

package interop;

application ElectronAppLauncher {

        provides AppLauncherService to interop.AppLifecycleManager { Launch; }
}
apps.json
{
      "id": "interop.ElectronAppLauncher",
      "displayName": "Electron App Launcher",
      "launcherId": "interop.NativeAppLauncher",
      "launcherParams": {
        "cmd": "../ElectronAppLauncher.cmd",
        "args": ""
      }
}

System requirements

Supported Windows versions

  • Windows 7 SP1

  • Windows 8.1

  • Windows 10

  • Windows Server 2008 R2 SP1

  • Windows Server 2012 SP1

  • Windows Server 2012 R2 SP1

  • Windows Server 2016

Supported Mac OS X versions

Not supported for now. Will be added soon.

Reference

App Registry

Application registry is a JSON array where each item has following properties:

Table 1. Application properties
Name Type Description Example

id

string

Unique key representing the application

"interop.samples.GreetingServer"

displayName

string

Human readable display name of the application

"App which provides greetings to other apps"

launcherId

string

A reference to another application in the registry which starts this app

"interop.NativeAppLauncher"

launcherParams

object

Application launcher specific object which contains data required by launcher to launch the specific application

{ "cmd": "../apps/GreetingServer/GreetingServer.exe", "args": "" }

Find below a JSON schema for the application registry

{
    "type": "array",
    "items": {
        "type": "object",
        "properties": {
            "id": {
                "type": "string",
                "required": true
            },
            "displayName": {
                "type": "string",
                "required": false
            },
            "launcherId": {
                "type": "string",
                "required": false
            },
            "launcherParams": {
                "type": "object",
                "required": false,
                "additionalProperties": true
            }
        }
    }
}

Interop Registry

Messages and Services

For messages and services definition Plexus uses Google Protobuf v3 format. For example:

package fx;

message CcyPair {
  string ccyPairName = 1;
}

message CcyPairRate {
  string ccyPairName = 1;
  double rate = 2;
}

service CcyPairRateService {
  rpc GetRate (CcyPair) returns (CcyPairRate);
}

You can find Protobuf specification here - https://developers.google.com/protocol-buffers/docs/proto3

Application Interop Manifests

Application interop manifests are defined using special .interop format. Here is an example:

package vendorA.fx; (1)

import "fx/CcyPairRateService.proto"; (2)

application CcyPairRateProvider { (3)
    provides fx.CcyPairRateService to vendorA.* { GetRate; } (4)
    consumes interop.samples.GreetingService from interop.samples.GreetingServer { ShowGreeting; } (5)
    consumes interop.samples.EchoService from interop.* { Echo; } (6)
}
1 Define namespace for application
2 Import services definition that application provides or consumes
3 Application name
4 Provided service definition. In this case app only allows using this implementation for other apps in package "vendorA".
5 Consumed service definition
6 One more consumed service definition.
Consumes

Consumes notation defines to which Service methods this component can execute. This Methods can be provided by several components and they will be accessible by discovery for component client code. We explicitly ask to specify exact methods to support schema evolution and corner cases where one component produce not the full set of service methods

Imagine the following scenario:

  • There is "ServiceA" fully implemented and produced by two components "ComponentA" and "ComponentB"

  • We add new additional method to "ServiceA"

  • "ComponentA" implements this method, where "ComponentB" is still in progress

Thus exact method import will allow plexus to check if some methods used anywhere, it will allow to check if method is produced by both components and give error if not.

Provides

We ask to provide explicit list of methods for consume and provide notations to support service evolutions and changes. Plexus will be able to check if what set of methods of specific Service implemented in concrete Component and validate if all mapping are correct and consistent.

TypeScript Client API

Send Unary Request

Executes Point-to-Point invocation, client receives response from server.

sendUnaryRequest<Req, Res>(
    invocationInfo: InvocationDescriptor,
    requestMessage: Req,
    responseHandler: ValueHandler<Res>): Promise<InvocationClient>;

Parameters

invocationInfo

details of remote action to execute

requestMessage

simple object representing request details

responseHandler

response/error handler

Returns

invocationClient

provides ability to cancel invocation, to notificate remote side that response no longer required

Examples

  • Invoke remote action on any available provider:

const invocationInfo: InvocationDescriptor = {
    serviceId: 'CcyRateService',
    methodId: 'GetRate'
};
const requestMessage = { ccyPairName: 'EURUSD' };
client.sendUnaryRequest<ICcyPair, ICcyPairRate>(invocationInfo, requestMessage, {
    value: pairRate => console.log(`Received rate ${pairRate.rate}`),
    error: e => console.error('Failed to get rate', e)
});
  • Invoke remote action on particular provider:

const invocationInfo: InvocationDescriptor = {
    serviceId: 'CcyRateService',
    methodId: 'GetRate',
    // provider's app id, defined in metadata
    applicationId: 'RateProvider',
    // optional, if we want to pass invocation to particular running instance
    connectionId: remoteAppGuid
};
// same as in example above ...

Register Unary Handler

Registers handler for provided unary method.

onUnary<Req, Res>(
    serviceInfo: ServiceInfo,
    handler: UnaryInvocationHandler<Req, Res>): void;

Parameters

serviceInfo

implemented service details

handler

method details and provided implementation

Examples

  • Register unary invocation handler

    const serviceInfo = { serviceId: 'CcyRateService' };
    clientBuilder.onUnary<ICcyPair, ICcyPairRate>(serviceInfo, {
        methodId: 'GetRate',
        handle: async (invocationContext: MethodInvocationContext, request: ICcyPair) => {
            const rate: ICcyPairRate = await internalRateService.getRate(request.ccyPairName);
            return rate;
        }
    };

Discover Methods

Discovers available method invocation handlers.

discoverMethod(discoveryRequest: MethodDiscoveryRequest): Promise<MethodDiscoveryResponse>;

Parameters

discoveryRequest

discover criteria, defined in client protocol. All fields are optional, providing flexibility to search under different criteria.

Returns

discoveryResponse

discovered remote method implementations if any, containing extensive information on method provider. Discovered method instances can be passed to corresponding invocation methods.

Examples

  • Discover by input message and invoke first available implementation:

const discoveryResponse = await client.discoverMethod({
    inputMessageId: 'plexus.interop.CcyPair',
});
client.sendUnaryRequest<ICcyPair, ICcyPairRate>(
    discoveryResponse.methods[0],
    { ccyPairName: 'EURUSD' },
    {
        value: pairRate => console.log(`Received rate ${pairRate.rate}`),
        error: e => console.error('Failed to get rate', e)
    }
);
Discovery Request supports various of options, you can use any combinations of them.
  • Discover by response type:

const discoveryResponse = await client.discoverMethod({
    outputMessageId: 'plexus.interop.CcyPairRate'
});
  • Discover only online method handlers:

const discoveryResponse = await client.discoverMethod({
    outputMessageId: 'plexus.interop.CcyPairRate',
    discoveryMode: DiscoveryMode.Online
});
  • Discover by method reference:

const discoveryResponse = await client.discoverMethod({
    consumedMethod: {
        consumedService: {
            serviceId: 'CcyRateService'
        },
        methodId: 'GetRate'
    }
});
  • Implementing "broadcast" communication using discovery.

Using discoveryMethod as low level API we can easily implement sending of broadcast messages using following steps:

  1. Define method with Empty return type to be handled by listener components

    service CcyPairBroadcastService {
        rpc onCcyPairRate (CcyPairRate) returns (Empty);
    }
  2. Setup all listener apps as provider of this action. Also we can define specific Provider instances or even companies we would like to listen events from:

    application ListenerApp {
        // listen to message from any provider
        provides CcyPairBroadcastService {onCcyPairRate;}
        // or setup listener for providers from specific namespace
        // provides CcyPairBroadcastService to com.acme.* {onCcyPairRate;}
    }
  3. Setup broadcast publisher app to consume this action

    application RateProviderApp {
        consumes CcyPairBroadcastService {onCcyPairRate;}
    }
  4. Use online discovery to publish new rate to all connected consumers:

    async function broadcast(rate: CcyPairRate): Promise<void> {
        const discoveryResponse = await client.discoverMethod({
            consumedMethod: {
                consumedService: {
                    serviceId: 'CcyPairBroadcastService'
                },
                methodId: 'onCcyPairRate'
            },
            discoveryMode: DiscoveryMode.Online
        });
        const responseHandler = {
            value: () => console.log('Delivered')
        };
        discoveryResponse.methods.forEach(method => {
            client.sendUnaryRequest<ICcyPairRate, Empty>(
                method,
                rate,
                responseHandler);
        });
    }

Discover Services

Discovers available service implementations. Provides ability to use few methods from particular provider together.

discoverService(discoveryRequest: ServiceDiscoveryRequest): Promise<ServiceDiscoveryResponse>;

Parameters

discoveryRequest

discover criteria, defined in client protocol. All fields are optional, providing flexibility to search under different criteria.

Returns

discoveryResponse

discovered remote service implementations if any, containing extensive information on method provider. Discovered method instances can be passed to corresponding invocation methods.

Examples

  • Discover service, lookup for pair of methods to be used together:

    const discoveryResponse = client.discoverService({
        consumedService: {
            serviceId: 'CcyRateService'
        }
    });
    // choose service implementation, by provider id/service alias/other details
    const service: DiscoveredService = serviceDiscoveryResponse
        .services
        .find(s => providedService.applicationId === 'RateProviderApp');
    // choose required methods and invoke them as described above in 'Discover Methods' examples
    const getRateMethod = service.methods.find(m => m.methodId === 'GetRate');
    const getRateStreamMethod = service.methods.find(m => m.methodId === 'GetRateStream');
  • Discover only online service providers

    const discoveryResponse = client.discoverService({
        consumedService: {
            serviceId: 'CcyRateService'
        },
        discoveryMode: DiscoveryMode.Online
    });

Send Server Streaming Request

Executes Server Streaming invocation, client receives stream of messages from server.

sendServerStreamingRequest<Req, Res>(
    invocationInfo: InvocationDescriptor,
    requestMessage: Req,
    responseObserver: InvocationObserver<Res>): Promise<InvocationClient>;

Parameters

invocationInfo

details of remote action to execute

requestMessage

simple object representing request details

responseObserver

response stream observer

Returns

invocationClient

provides ability to cancel invocation, to notificate remote side that response no longer required

Examples

  • Receive stream or rates from provider:

    const invocationInfo: InvocationDescriptor = {
        serviceId: 'CcyRateService',
        methodId: 'GetRateStream'
    };
    const requestMessage = { ccyPairName: 'EURUSD' };
    client.sendServerStreamingRequest<ICcyPair, ICcyPairRate>(invocationInfo, requestMessage, {
        next: pairRate => console.log(`Received rate ${pairRate.rate}`),
        complete: () => console.log('Invocation completed'),
        error: e => console.error('Failed to get rate', e)
    });

Register Server Streaming Handler

Registers handler for provided server streaming method.

onServerStreaming<Req, Res>(
    serviceInfo: ServiceInfo,
    handler: ServerStreamingInvocationHandler<Req, Res>): void;

Parameters

serviceInfo

implemented service details

handler

method details and provided implementation

Examples

  • Register stream handler:

    const serviceInfo = { serviceId: 'CcyRateService' };
    clientBuilder.onServerStreaming<ICcyPair, ICcyPairRate>(serviceInfo, {
        methodId: 'GetRateStream',
        handle: async (invocationContext: MethodInvocationContext,
                request: ICcyPair,
                hostClient: StreamingInvocationClient<ICcyPairRate>) => {
            const rate: ICcyPairRate = await internalRateService.getRate(request.ccyPairName);
            hostClient.next(rate);
            // send other rate
            const otherRate: ICcyPairRate = await internalRateService.getRate(request.ccyPairName);
            hostClient.next(otherRate);
            // complete streaming
            hostClient.complete();
        }
    };

Send Bidirectional Streaming Request

Executes Bidirectional Streaming invocation. Client and Server communicates using streams of messages.

sendBidirectionalStreamingRequest<Req, Res>(
    invocationInfo: InvocationDescriptor,
    responseObserver: InvocationObserver<Res>): Promise<StreamingInvocationClient<Req>>;

Parameters

invocationInfo

details of remote action to execute

responseObserver

response stream observer

Returns

streamingInvocationClient

invocation client, provides ability to send stream of messages, complete or cancel invocation.

Examples

  • Bidirectional communication, client updates server on requested ccy pairs and receives stream of rates from server:

    const invocationInfo: InvocationDescriptor = {
        serviceId: 'CcyRateService',
        methodId: 'GetRateBidiStream'
    };
    const invocation = await client.sendBidirectionalStreamingRequest<ICcyPair, ICcyPairRate>(
        invocationInfo,
        {
            next: pairRate => console.log(`Received rate ${pairRate.rate}`),
            streamCompleted: () => console.log('Remote stream completed'),
            complete: () => console.log('Invocation completed'),
            error: e => console.error('Failed to get rate', e)
        });
    invocation.next({ ccyPairName: 'EURUSD' });
    // ask for other ccy pair rates later
    invocation.next({ ccyPairName: 'EURGBP' });
    // notify server that no more pairs be requested
    invocation.complete();

Register Bidirectional Streaming Handler

Registers handler for provided bidirectional streaming method.

onBidiStreaming<Req, Res>(
    serviceInfo: ServiceInfo,
    handler: BidiStreamingInvocationHandler<Req, Res>): void;

Parameters

serviceInfo

implemented service details

handler

method details and provided implementation

Examples

  • Register bidirectional handler:

    const serviceInfo = { serviceId: 'CcyRateService' };
    clientBuilder.onBidiStreaming<ICcyPair, ICcyPairRate>(serviceInfo, {
        methodId: 'GetRateStream',
        handle: (invocationContext: MethodInvocationContext,
                hostClient: StreamingInvocationClient<ICcyPairRate>) => {
            const ccyPairs = [];
            // send rate for each requested pair every second
            setInterval(() => {
                ccyPairs.forEach(pair => {
                    hostClient.next(internalRateService.getRate(pair)
                });
            }, 1000);
            // client's stream observer
            return {
                next: ccyPair => ccyPairs.push(ccyPair)
            };
        }
    };