Skip to content

Receiving messages using NetMessagingBinding with custom formats

March 15, 2013

As a follow up to my earlier post on Formatting the content for Service Bus messages where we walked through the BrokeredMessage APIs that allow you to have custom formats for message content, I wanted to cover how you can use NetMessagingBinding (WCF) to receive such messages. This scenario would be relevant when your clients/senders are using REST API or non-.NET SDKs like Java, Python, PHP and Node.js and you server/receivers want to use WCF to process the messages.

NetMessagingBinding always builds a channel stack with BinaryMessageEncodingBindingElement+NetMessagingTransportBindingElement. If the BrokeredMessages in your ServiceBus Queue/Subscription are plain text, xml, then BinaryMessageEncoding won’t work, using WCF you’d have use a CustomBinding with TextMessageEncoder and NetMessagingTransportBindingElement instead.

You need to use a CustomBinding with TextMessageEncodingBindingElement (with MessageVersion = None) and a NetMessagingTransportBindingElement, make sure Action=”*”, and set AddressFilterMode=Any on your ServiceBehavior.

Here are two ways to read a plain XML message using NetMessagingTransportBindingElement:

Solution #1 Use System.ServiceModel.Channels.Message in the ServiceContract and call Message.GetBody()

namespace MessagingConsole
{
    static class Constants {
        public const string ContractNamespace = "http://contoso";
    }

    [DataContract(Namespace = Constants.ContractNamespace)]
    class Record
    {
        [DataMember]
        public string Id { get; set; }
    }

    [ServiceContract]
    interface ITestContract
    {
        [OperationContract(IsOneWay = true, Action="*")]
        void UpdateRecord(Message message);
    }

    [ServiceBehavior(
        AddressFilterMode = AddressFilterMode.Any)] // This is another way to avoid “The message with To ” cannot be processed at the receiver…”
    class TestService : ITestContract
    {
        [OperationBehavior]
        public void UpdateRecord(Message message)
        {
            Record r = message.GetBody<Record>();
            Console.WriteLine("UpdateRecord called! " + r.Id);
        }
    }

    class ServiceProgram
    {
        static void Main(string[] args)
        {
            string connectionString = "Endpoint=sb://<Service Bus connection string from Azure portal>";
            string topicPath = "Topic2";
            string subscriptionName = "Sub0";

            MessagingFactory factory = MessagingFactory.CreateFromConnectionString(connectionString);
            TopicClient sender = factory.CreateTopicClient(topicPath);
            SubscriptionClient receiver = factory.CreateSubscriptionClient(topicPath, subscriptionName, ReceiveMode.ReceiveAndDelete);

            string interopPayload = "<Record xmlns='" + Constants.ContractNamespace + "'><Id>4</Id></Record>";
            BrokeredMessage interopMessage = new BrokeredMessage(new MemoryStream(Encoding.UTF8.GetBytes(interopPayload)), true);
            sender.Send(interopMessage);

            CustomBinding binding = new CustomBinding(
                new TextMessageEncodingBindingElement { MessageVersion = MessageVersion.None },
                new NetMessagingTransportBindingElement());
            ServiceHost serviceHost = new ServiceHost(typeof(TestService), new Uri(solution));
            ServiceEndpoint endpoint = serviceHost.AddServiceEndpoint(typeof(ITestContract), binding, topicPath + "/Subscriptions/" + subscriptionName);
            endpoint.Behaviors.Add(new TransportClientEndpointBehavior(tokenProvider));
            serviceHost.Open();
            Console.WriteLine("Service is running");
            Console.ReadLine();            
        }
    }
}

Solution #2 Define a MessageContract data type to make the expected Soap contract match what the interop client is sending:

namespace MessagingConsole
{
    static class Constants
    {
        public const string ContractNamespace = "http://contoso";
    }

    [DataContract(Namespace = Constants.ContractNamespace)]
    class Record
    {
        [DataMember]
        public string Id { get; set; }
    }

    [MessageContract(IsWrapped=false)]
    class RecordMessageContract
    {
        [MessageBodyMember(Namespace = Constants.ContractNamespace)]
        public Record Record { get; set; }
    }

    [ServiceContract]
    interface ITestContract
    {
        [OperationContract(IsOneWay = true, Action="*")]
        void UpdateRecord(RecordMessageContract recordMessageContract);
    }

    class ServiceProgram
    {
        static void Main(string[] args)
        {
            string connectionString = "Endpoint=sb://<Service Bus connection string from Azure portal>"
            MessagingFactory factory = MessagingFactory.CreateFromConnectionString(connectionString);
            TopicClient sender = factory.CreateTopicClient(topicPath);
            SubscriptionClient receiver = factory.CreateSubscriptionClient(topicPath, subscriptionName, ReceiveMode.ReceiveAndDelete);

            string interopPayload = "<Record xmlns='" + Constants.ContractNamespace + "'><Id>5</Id></Record>";
            BrokeredMessage interopMessage = new BrokeredMessage(new MemoryStream(Encoding.UTF8.GetBytes(interopPayload)), true);
            sender.Send(interopMessage);

            CustomBinding binding = new CustomBinding(
                new TextMessageEncodingBindingElement { MessageVersion = MessageVersion.None },
                new NetMessagingTransportBindingElement());
            ServiceHost serviceHost = new ServiceHost(typeof(TestService), new Uri(solution));
            ServiceEndpoint endpoint = serviceHost.AddServiceEndpoint(typeof(ITestContract), binding, topicPath + "/Subscriptions/" + subscriptionName);
            endpoint.Behaviors.Add(new TransportClientEndpointBehavior(tokenProvider));
            serviceHost.Open();
            Console.WriteLine("Service is running");
            Console.ReadLine();
        }
    }

    [ServiceBehavior(
        AddressFilterMode = AddressFilterMode.Any
        )]
    class TestService : ITestContract
    {
        [OperationBehavior]
        public void UpdateRecord(RecordMessageContract recordMessageContract)
        {
            Record r = recordMessageContract.Record;
            Console.WriteLine("UpdateRecord called! " + r.Id);
        }
    }
}
About these ads
One Comment

Trackbacks & Pingbacks

  1. Scott Banwart's Blog › Distributed Weekly 199

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: