Sonoma Partners Microsoft CRM and Salesforce Blog

Salesforce Streaming API in .NET (Part 1)

Today’s guest blogger is Nathen Drees, a Developer at Sonoma Partners.

Overview

One of the things we have the opportunity to help our customers with here at Sonoma Partners is integrating their CRM systems with their other software systems. For example, many customers have ERP (Enterprise Resource Planning) or POS (Point of Sale) systems that they use in conjunction with CRM, and they would like the two systems to be able to talk to each other and keep the relevant data in sync. Broadly speaking, this can be handled in two ways:

  • We build a job that runs on a regular basis (ex: nightly at a certain time) that exports all relevant data from one system, does some transformations and imports it into the other.
  • We build a system that provides close to real-time updates between the two (dubbed “near-time”).

Option 1 is by and far the most common, but I personally find option 2 to be the more interesting one. In the Salesforce ecosystem, we have 2 technologies we can employ to build a near-time integration: pushing data from Salesforce (ex: a trigger that makes a web service call to an external system you have already built), or through the Streaming API to listen for changes and react to them. Pushing data from Salesforce can work, but it can also cause problems if changes happen frequently or the system you’re attempting to make calls to becomes unavailable for any reason. I decided that I would try out the Streaming API to see if this helped solve those issues.

In part 1 of this post, I will show some sample code that attempts to build a solution for a near-time integration using the Salesforce Streaming API. In Part 2, I’ll talk about what this solution means from a business perspective along with its merits and pitfalls.

Part 1 – The Code

I decided I wanted to build this near-time process in .NET since it is what I and most of my colleagues are most comfortable in. Salesforce provides this page for getting started in Java, but no similar sample for .NET. I decided I would follow the Java sample and deviate from it where needed to build my .NET application.

When building my application, steps 1 and 2 from the Java sample application were identical, so I won’t cover those here. Starting at step 3 is where I had to start getting creative with the .NET code.

In step 3, the instructions have you download some jar files which provide the necessary CometD implementation to work with the Streaming API (more about CometD here). For .NET, a quick Google search turns up CometD.NET hosted on Github (this is also the version on NuGet). Unfortunately, this version of the library doesn’t allow you to set headers for the outgoing requests, so there’s no way to set the OAuth token like you need to in the Java sample. However, doing a little more digging on Github I noticed a pull request for the library that adds exactly the functionality I needed (located here). So far so good.

In step 4, we finally get to start writing code. To start, here’s the important parts of the final source code.

using System;
using System.Collections.Generic;
using System.Linq;
using Cometd.Client;
using Cometd.Client.Transport;
using CredentialManagement;
using Sample.partner.wsdl;
using System.Collections.Specialized;
using System.Threading;

namespace Sample
{
    class Program
    {
        private const String CRED_KEY = "ndrees@23demo.com";
        private const String CHANNEL = "/topic/InvoiceStatementUpdates";
        private const String STREAMING_ENDPOINT_URI = "/cometd/29.0";

        // long pull durations
        private const int READ_TIMEOUT = 120 * 1000;
        private const int THREAD_TIMEOUT = 60 * 1000;

        static void Main(string[] args)
        {
            try
            {
                RunExample();
            }
            catch (Exception e)
            {
                Console.WriteLine(e.Message);
                Console.WriteLine(e.StackTrace);

                Exception innerException = e.InnerException;
                while (innerException != null)
                {
                    Console.WriteLine(e.Message);
                    Console.WriteLine(e.StackTrace);

                    innerException = innerException.InnerException;
                }
            }
        }

        private static void RunExample()
        {
            BayeuxClient client = null;

            using (var cred = new Credential { Target = CRED_KEY })
            {
                Console.WriteLine("Loading credentials from windows credential vault.");

                if (!cred.Load())
                {
                    Console.WriteLine("Could not find credential with key {0} in windows credential vault.", CRED_KEY);
                    return;
                }

                client = CreateClient(cred);
            }

            Console.WriteLine("Handshaking.");
            client.handshake();
            client.waitFor(1000, new[] { BayeuxClient.State.CONNECTED });

            Console.WriteLine("Connected.");

            client.getChannel(CHANNEL).subscribe(new SampleListener());
            Console.WriteLine("Waiting for data from server...");

            Console.WriteLine("Press any key to shut down.");
            Console.ReadKey();

            Console.WriteLine("Shutting down...");
            client.disconnect();
            client.waitFor(1000, new[] { BayeuxClient.State.DISCONNECTED });
        }

        private static BayeuxClient CreateClient(Credential cred)
        {
            Console.WriteLine("Authenticating with Salesforce.");

            var soapClient = new SoapClient();
            var result = soapClient.login(null, null, cred.Username, cred.Password);
            if (result.passwordExpired)
                throw new ArgumentOutOfRangeException("Password has expired");

            Console.WriteLine("Authenticated.");

            var options = new Dictionary<String, Object>
            {
                { ClientTransport.TIMEOUT_OPTION, READ_TIMEOUT }
            };
            var transport = new LongPollingTransport(options);

            // add the needed auth headers
            var headers = new NameValueCollection();
            headers.Add("Authorization", "OAuth " + result.sessionId);
            transport.AddHeaders(headers);

            // only need the scheme and host, strip out the rest
            var serverUri = new Uri(result.serverUrl);
            String endpoint = String.Format("{0}://{1}{2}", serverUri.Scheme, serverUri.Host, STREAMING_ENDPOINT_URI);

            return new BayeuxClient(endpoint, new[] { transport });
        }
     }
}

 

using System;
using System.Linq;
using Cometd.Bayeux;
using Cometd.Bayeux.Client;

namespace Sample
{
    class SampleListener : IMessageListener
    {
        public void onMessage(IClientSessionChannel channel, IMessage message)
        {
            Console.WriteLine(message);
        }
    }
}

The SampleListener.cs file shows an example of how to respond to messages arriving from Salesforce. This is the class where most of the application’s logic would reside, but for the sample it just prints out the message it receives (a JSON blob). To use the listener, I need to:

  • Authenticate with Salesforce.
  • Create a BayeuxClient so that we can subscribe to the CometD channel.
  • Set the authentication parameters on the client so that we can pass Salesforce’s security checks.
  • Subscribe to channels we care about.

Looking back at the Java sample, they provide a helper class which manually creates and sends the SOAP request to log in. To me, this felt messy, so I wanted to use one of the more standard ways of authenticating. I decided I would use generated SOAP proxies to perform the authentication for me. I wrote a blog post on how to create proxy classes and authenticate with Salesforce in this blog post, so I won’t cover that again here.

After I authenticate with Salesforce, I instantiate a BayeuxClient (the CreateClient method of Program.cs). You’ll notice that after I instantiate the client, I set the authorization header to the session ID returned from the login call. This will only work if you use Couchand’s version of CometD.NET since the AddHeaders method doesn’t exist in the version on NuGet (Oyatel’s version). One other thing to note: I only need the host and protocol of the server URL returned from the login call, not the full server URL. Combining those observations with the Java sample gets me a working BayeuxClient instance.

The rest of the sample follows pretty straight forward from a combination of the Java sample and the instructions on Github on how to use the library. The only final thing worth noting is you need to somehow keep the process alive to be able to receive messages.

Testing this out, I was able to subscribe to the CometD endpoint for my test organization and see inserts and updates pushed to my console application.

Part 2 of this is now available here

Topics: Salesforce