Tamizh in words

Ranging Items In E-Commerce Marketplaces

Published on
Read Time
· 26 min read

In this seventh part of the blog series Building an E-Commerce Marketplace Middleware in Clojure, I am going to share how we captured a business operation from the client's Order Management System(OMS) processed it in a marketplace.

Ranging is an activity in the OMS that turn on the visibility of an item in a marketplace and make it available for sale. The reverse operation is Deranging, which unlist the item from the marketplace. There are two more operations Inventorying and Pricing which updates the inventory and the pricing of the items in the marketplace, respectively.

The back-office team of the Client perform these operations in their OMS. In turn, the OMS communicates the performed action to the middleware through IBM-MQ using XML encoded message.

This blog post is going to be a long one. So, here is a sneak preview of what we'll be learning on the Clojure implementation front.

  • A variant of the Functional Core, Imperative Shell technique in action.
  • More Clojure.Spec (and multi-spec) and asserting the public function parameters using it.
  • XML Parsing & Validation
  • Persisting JSON data in PostgreSQL using Toucan and much more.

Unified Message Handling

The handling logic of all these operational messages will be as follows.

  1. Upon receiving the message, we log the message as an OMS event. It help us to keep track of the messages that we received from the OMS.

  2. Then we parse the message. If it is a failure, we will be logging it as an error in the form of a System event.

  3. If parsing is successful, for each channel in the message, we check whether the given channel exists.

  4. If the channel exists, we will be performing the respective operation in the marketplace. If the processing succeeds, we log it as a domain success event else we log it as a domain failure event.

  5. If the channel not found, we'll be logging as a system event.

Events from steps two to five, treats the OMS event (step one) as the parent event.

In the rest of the blog post, we'll be talking about the implementation of Ranging message and the marketplace Tata-CliQ alone.

Revisiting Event Spec

As a first step towards implementing this unified message handling, let's start from adding the spec for the :oms event type.

# src/wheel/middleware/event.clj

- (s/def ::type #{:domain :system })
+ (s/def ::type #{:domain :system :oms})
; src/wheel/middleware/event.clj
; ...
(defmethod event-type :oms [_]
(s/keys :req-un [::id ::name ::type ::level ::timestamp]))

Adding Payload Spec

All three event types are going to have payloads that provide extra information about an event. To specify different payload specs, we first need to define the different event names.

; src/wheel/middleware/event.clj
; ...
(s/def ::oms-event-name #{:oms/items-ranged})
(s/def ::domain-event-name #{})
(s/def ::system-event-name #{:system/parsing-failed

(s/def ::name ...
; ...
  • :oms/items-ranged - Ranging message received from OMS
  • :system/parsing-failed - Parsing ranging message failed.
  • :system/channel-not-found - Channel specified in the ranging message not found.

We are leaving the domain-event-name spec as an empty set for now.

Earlier we had the had the spec for the name as qualified-keyword?. We have to change it to either one of the above event-name spec.

# src/wheel/middleware/event.clj

- (s/def ::name qualified-keyword?)
+ (s/def ::name (s/or :oms ::oms-event-name
+ :domain ::domain-event-name
+ :system ::system-event-name))

Before defining the payload type for these event-names, let's add the spec for the messages from OMS.

> touch src/wheel/oms/message.clj
; src/wheel/oms/message.clj
(ns wheel.oms.message
(:require [clojure.spec.alpha :as s]))

(s/def ::type #{:ranging})
(s/def ::id uuid?)
(s/def ::message (s/and string? (complement clojure.string/blank?)))

(s/def ::oms-message
(s/keys :req-un [::type ::id ::message]))

Then add the event payload spec as below.

; src/wheel/middleware/event.clj
(ns wheel.middleware.event
(:require ; ...
[wheel.oms.message :as oms-message]))
; ...
(defmulti payload-type :type)

(defmethod payload-type :oms/items-ranged [_]
(s/keys :req-un [::oms-message/message]))

(s/def ::error-message (s/and string? (complement clojure.string/blank?)))

(s/def ::message-type ::oms-message/type)
(defmethod payload-type :system/parsing-failed [_]
(s/keys :req-un [::error-message ::message-type]))

(defmethod payload-type :system/channel-not-found [_]
(s/keys :req-un [::channel-id ::message-type]))

(defmethod payload-type :default [_]
(s/keys :req-un [::type]))
(s/def ::payload (s/multi-spec payload-type :type))

(defmulti event-type ...
; ...

Finally, add this payload spec in all the event spec.

# src/wheel/middleware/event.clj

(defmethod event-type :system [_]
- (s/keys :req-un [::id ::name ::type ::level ::timestamp]
+ (s/keys :req-un [::id ::name ::type ::level ::timestamp ::payload]
:opt-un [::parent-id]))
(defmethod event-type :domain [_]
- (s/keys :req-un [::id ::name ::type ::level ::timestamp
+ (s/keys :req-un [::id ::name ::type ::level ::timestamp ::payload
::channel-id ::channel-name]
:opt-un [::parent-id]))
(defmethod event-type :oms [_]
- (s/keys :req-un [::id ::name ::type ::level ::timestamp]))
+ (s/keys :req-un [::id ::name ::type ::level ::timestamp ::payload]))

System Processing Failed Event

To model the unhandled exception while processing a message from OMS, let's add a new event name :system/processing-failed.

; src/wheel/middleware/event.clj
; ...
(s/def ::system-event-name #{ ;...
; ...
(s/def ::stacktrace (s/and string? (complement clojure.string/blank?)))

(defmethod payload-type :system/processing-failed [_]
(s/keys :req-un [::error-message ::stacktrace]
:opt-un [::message-type]))

; ...

Implementing Unified Message Handler

With the spec for all the possible events in place, now it's time to implement the message handler for the messages from OMS.

Let's start it from the rewriting message listener that we implemented in the last blog post

; src/wheel/infra/oms.clj
(ns wheel.infra.oms
(:require ; ...
[wheel.middleware.event :as event]
[wheel.middleware.core :as middleware] ; <1>
[wheel.infra.log :as log])
; ...
; ...
(defn- message-listener [message-type oms-event-name] ; <2>
(proxy [MessageListener] []
(onMessage [^Message msg]
(let [message (.getBody msg String)
oms-event (event/oms oms-event-name message)] ; <3>
(->> (middleware/handle {:id (:id oms-event) ; <4>
:message message
:type message-type})
(cons oms-event) ; <5>
log/write-all!)) ; <6>
(catch Throwable ex
(log/write! (event/processing-failed ex))))))) ; <7>
; ...

<1> & <4> The namespace wheel.middleware.core doesn't exist yet. We'll be adding it in a few minutes. This namespace is going to have a function handle that takes oms-message and performs the required actions in the marketplace. Then it returns a collection of events that represent the results of these actions. Think of this as a router in a web application.

<2> The rewritten version of the message-listener function now takes two parameters, message-type and oms-event-name. These parameters make it generic for processing the different types of messages from OMS.

<3> & <7> The oms and processing-failed functions in the wheel.middleware.event namespace is also not added yet, and we'll be adding them in the next step. These functions construct an event of type oms and system with the parameters passed.

<5> We are prepending the oms-event with the results from the handle functions. This oms-event is the parent event that triggered all the other events

<6> We are writing all the events in the log using the write-all! function that we defined earlier.

As we have changed the signature of the message-listener function, let's update the ranging-consumer state that we defined using it.

  (mount/defstate ranging-consumer
:start (let [queue-name (:ranging-queue-name (config/oms-settings))
- listener (message-listener)]
+ listener (message-listener :ranging :oms/items-ranged)]
(start-consumer queue-name jms-ranging-session listener))
:stop (stop ranging-consumer))

Here we are creating of message-listener for handling the :ranging message from the OMS, and we name the message received from OMS as :oms/items-ranged

This design follows a variant of the Functional Core, Imperative Shell technique.

Adding Event Create Functions

In the message-listener function, we are calling two functions event/oms to event/processing-failed to create events. These functions don't exist yet. So, let's add it.

# src/wheel/offset-date-time.clj

(ns wheel.offset-date-time
(:require [clojure.spec.alpha :as s])
(:import [java.time.format DateTimeFormatter DateTimeParseException]
- [java.time OffsetDateTime]))
+ [java.time OffsetDateTime ZoneId]))
# ...
+ (defn ist-now []
+ (OffsetDateTime/now (ZoneId/of "+05:30")))
; src/wheel/middleware/event.clj
(ns wheel.middleware.event
(:require ; ...
[clojure.stacktrace :as stacktrace]
[wheel.offset-date-time :as offset-date-time]
[wheel.oms.message :as oms-message])
(:import [java.util UUID]))
; ...

(defn- event [event-name payload &{:keys [level type parent-id
channel-id channel-name]
:or {level :info
type :domain}}] ; <1>
{:post [(s/assert ::event %)]}
(let [event {:id (UUID/randomUUID)
:timestamp (str (offset-date-time/ist-now))
:name event-name
:level level
:type type
:payload (assoc payload :type event-name)}]
(cond-> event
parent-id (assoc :parent-id parent-id)
channel-id (assoc :channel-id channel-id)
channel-name (assoc :channel-name channel-name))))

(defn oms [oms-event-name message]
{:pre [(s/assert ::oms-event-name oms-event-name)
(s/assert ::oms-message/message message)]
:post [(s/assert ::event %)]}
(event oms-event-name
{:message message}
:type :oms))

(defn- ex->map [ex]
{:error-message (with-out-str (stacktrace/print-throwable ex))
:stacktrace (with-out-str (stacktrace/print-stack-trace ex 3))})

(defn processing-failed [ex]
{:post [(s/assert ::event %)]}
(event :system/processing-failed
(ex->map ex)
:type :system
:level :error))

<1> The event function takes the name and payload (without type) of the event along with a set of keyword arguments and constructs a Clojure map that conforms to the event spec.

Let's also add the parsing-failed function to construct the parsing-failed event which we will be using shortly.

; src/wheel/middleware/event.clj
; ...
(defn parsing-failed [parent-id message-type error-message]
{:pre [(s/assert uuid? parent-id)
(s/assert ::oms-message/type message-type)
(s/assert ::error-message error-message)]
:post [(s/assert ::event %)]}
(event :system/parsing-failed
{:error-message error-message
:message-type message-type}
:parent-id parent-id
:type :system
:level :error))

Adding Generic Message Handler

The message-listener function at the application boundary creates the oms-message with the message received from IBM-MQ and pass it to the middleware to handle. This middleware's handle function also not implemented yet. So, let's add it as well.

> touch src/wheel/middleware/core.clj

The messages from OMS are XML encoded. So, the handler has to validate it against an XML schema. If it is valid, then it has to be parsed to a Clojure data structure (sequence of maps).

This parsed data structure is also needed to be validated using clojure.spec to make sure that the message is a processable one. If the validation fails in either one of this, we'll be returning the parsing-failed event.

(ns wheel.middleware.core
(:require [clojure.spec.alpha :as s]
[clojure.java.io :as io]
[wheel.middleware.event :as event]
[wheel.oms.message :as oms-message]
[wheel.xsd :as xsd]))

; <1>
(defmulti xsd-resource-file-path :type)
(defmulti parse :type)
(defmulti spec :type)

(defmulti process (fn [oms-msg parsed-oms-message] ; <2>
(:type oms-msg)))

(defn- validate-message [oms-msg]
(-> (xsd-resource-file-path oms-msg)
(xsd/validate (:message oms-msg)))) ; <3>

(defn handle [{:keys [id type]
:as oms-msg}]
{:pre [(s/assert ::oms-message/oms-message oms-msg)]
:post [(s/assert (s/coll-of ::event/event :min-count 1) %)]}
(if-let [err (validate-message oms-msg)]
[(event/parsing-failed id type err)]
(let [parsed-oms-message (parse oms-msg)]
(if (s/valid? (spec oms-msg) parsed-oms-message)
(process oms-msg parsed-oms-message) ; <4>
id type
(s/explain-str (spec oms-msg) parsed-oms-message))]))))

<1> & <2> We are defining three multi-methods xsd-resource-file-path, parse & spec to get the XML schema file path in the resources directory, parse the XML message to Clojure data structure and to get the expected clojure.spec of the parsed message respectively. The process multi-method abstracts the processing of the parsed message from OMS. Each OMS message type (ranging, deranging, etc.) has to have an implementation for these multi-methods.

<3> The validate-message performs the XML schema-based validation of the incoming message. We'll be adding the wheel.xsd/validate function shortly.

<4> We are dispatching the parsed OMS message to the process multimethod.

Then add a new file xsd.clj and implement the XML validation based on XSD as mentioned in this stackoverflow answer.

> touch src/wheel/xsd.clj
; src/wheel/xsd.clj
(ns wheel.xsd
(:import [javax.xml.validation SchemaFactory]
[javax.xml XMLConstants]
[org.xml.sax SAXException]
[java.io StringReader File]
[javax.xml.transform.stream StreamSource]))

(defn validate [^File xsd-file ^String xml-content]
(let [validator (-> (SchemaFactory/newInstance
(.newSchema xsd-file)
(->> (StringReader. xml-content)
(.validate validator))
(catch SAXException e (.getMessage e)))))

This validate function takes a xsd-file of type java.io.File and the xml-content of type String. It returns either nil if the xml-content conforms to the XSD file provided or the validation error message otherwise.

Adding Ranging Message Handler

A sample ranging message from the OMS would look like this

<EXTNChannelItem ChannelID="UA" EAN="EAN_1" ItemID="SKU1" RangeFlag="Y"/>
<EXTNChannelItem ChannelID="UA" EAN="EAN_2" ItemID="SKU2" RangeFlag="Y"/>
<EXTNChannelItem ChannelID="UB" EAN="EAN_3" ItemID="SKU3" RangeFlag="Y"/>

The EXTNChannelItemList element(s) specifies which channel that we have to communicate and the EXTNChannelItem element(s) determines the items that have to be ranged in that channel.

The XSD file for the ranging message is available in this gist

To keep this XSD file (and the future XSD files), create a new directory oms/message_schema under resources directory and download the gist there.

> mkdir -p resources/oms/message_schema
> wget https://gist.githubusercontent.com/tamizhvendan/4544f0123bd30681be1c5198ed87522c/raw/2c2112bde069f6d002c184e8cfc5a6db77fbebcb/ranging.xsd -P resources/oms/message_schema

# ...
- 'resources/oms/message_schema/ranging.xsd' saved

Then create the ranging.clj file under middleware directly and implement the xsd-resource-file-path multimethod which returns the above XSD file path.

; src/wheel/middleware/ranging.clj
(ns wheel.middleware.ranging
(:require [wheel.middleware.core :as middleware]))

(defmethod middleware/xsd-resource-file-path :ranging [_]

Then let's define the spec for the ranging message.

Given we are receiving the above sample XML as a message, we will be transforming it to a Clojure sequence as below.

({:channel-id "UA", :items ({:ean "EAN_1", :id "SKU1"} 
{:ean "EAN_2 ", :id "SKU2"})}
{:channel-id "UB", :items ({:ean "EAN_3", :id "SKU3"})})

To add a spec for this, Let's add the spec for the id and the ean of the item.

> mkdir src/wheel/oms
> touch src/wheel/oms/item.clj
; src/wheel/oms/item.clj
(ns wheel.oms.item
(:require [clojure.spec.alpha :as s]))

(s/def ::id (s/and string? (complement clojure.string/blank?)))
(s/def ::ean (s/and string? (complement clojure.string/blank?)))

Then use these specs to define the spec for the ranging message and return it in the spec multi-method implementation.

; src/wheel/middleware/ranging.clj
(ns wheel.middleware.ranging
(:require ; ...
[clojure.spec.alpha :as s]
[wheel.oms.item :as oms-item]
[wheel.marketplace.channel :as channel]))

; ...

(s/def ::item
(s/keys :req-un [::oms-item/ean ::oms-item/id]))

(s/def ::items (s/coll-of ::item :min-count 1))

(s/def ::channel-id ::channel/id)
(s/def ::channel-items
(s/keys :req-un [::channel-id ::items]))

(s/def ::message
(s/coll-of ::channel-items :min-count 1))

(defmethod middleware/spec :ranging [_]

The next step is parsing the XML content to a ranging message that satisfies the above spec.

The parse function from clojure.xml namespace parses the XML and returns a tree of XML elements.

wheel.middleware.ranging==> (clojure.xml/parse (java.io.StringBufferInputStream. "{above xml content}"))
{:attrs nil,
:content [{:attrs nil,
:content [{:attrs {:ChannelID "UA", :EAN "UA_EAN_1",
:ItemID "SKU1", :RangeFlag "Y"},
:content nil,
:tag :EXTNChannelItem}
{:attrs {:ChannelID "UA", :EAN "UA_EAN_2 ",
:ItemID "SKU2", :RangeFlag "Y "},
:content nil,
:tag :EXTNChannelItem}],
:tag :EXTNChannelItemList}
{:attrs nil,
:content [{:attrs {:ChannelID "UB", :EAN "UB_EAN_3",
:ItemID "SKU3", :RangeFlag "Y"},
:content nil,
:tag :EXTNChannelItem}],
:tag :EXTNChannelItemList}],
:tag :EXTNChannelList}

And we have to transform it to

({:channel-id "UA", :items ({:ean "EAN_1", :id "SKU1"} 
{:ean "EAN_2 ", :id "SKU2"})}
{:channel-id "UB", :items ({:ean "EAN_3", :id "SKU3"})})

Let's do it

; src/wheel/middleware/ranging.clj
(ns wheel.middleware.ranging
(:require ; ...
[clojure.xml :as xml])
(:import [java.io StringBufferInputStream]))

; ...

(defn- to-item [{:keys [EAN ItemID]}]
{:ean EAN
:id ItemID})

(defmethod middleware/parse :ranging [{:keys [message]}]
(->> (StringBufferInputStream. message)
(mapcat :content)
(map :attrs)
(group-by :ChannelID)
(map (fn [[id xs]]
{:channel-id id
:items (map to-item xs)}))))

Note: This kind of nested data transformation can also be achieved using XML Zippers or Meander.

The last multimethod that we need to define is process. To begin with, let's throw an exception in the implementation.

; src/wheel/middleware/ranging.clj
; ...

(defmethod middleware/process :ranging [_ ranging-message]
(throw (Exception. "todo")))

To load these multimethod definitions during application bootstrap, let's refer this namespace in the infra/core.clj file

; src/wheel/infra/core.clj
(ns wheel.infra.core
(:require ; ...
[wheel.middleware.ranging :as ranging]))
; ...

Revisiting Slack Appender

To make the Slack appender that we added earlier more meaningful, let's change the implementation of the event->attachment and send-to-slack function like below

; src/wheel/infra/log_appender/slack.clj
(ns wheel.infra.log-appender.slack
(:require [wheel.slack.webhook :as slack]
[wheel.infra.config :as config]
[cheshire.core :as json]))

; ...

(defn- event->attachment [{:keys [id channel-id channel-name parent-id payload]
:or {channel-name "N/A"
channel-id "N/A"
parent-id "N/A"}}]
{:color :danger
:fields [{:title "Channel Name"
:value channel-name
:short true}
{:title "Channel Id"
:value channel-id
:short true}
{:title "Event Id"
:value id
:short true}
{:title "Parent Id"
:value parent-id
:short true}
{:title "Payload"
:value (str
(json/generate-string (dissoc payload :type)
{:pretty true})

(defn- send-to-slack [{:keys [msg_]}]
(let [event (read-string (force msg_))
text (event->text event)
attachment (event->attachment event)]
(slack/post-message! (config/slack-log-webhook-url) text [attachment])))
; ...

When we test drive the app by reloading the application in the REPL and putting the following the XML messages in the IBM MQ, we'll get the respective notification in the Slack.


<!-- An empty space in the Channel ID -->
<EXTNChannelItem ChannelID=" " EAN="EAN_1" ItemID="SKU1" RangeFlag="Y"/>

Finally, a valid ranging XML message will throw the "todo" exception.

<EXTNChannelItem ChannelID="UA" EAN="EAN_1" ItemID="SKU1" RangeFlag="Y"/>

Everything is working as expected! Let's move to the final step of processing the ranging message.

Processing Ranging In A Marketplace Channel

The processing of a OMS message in a marketplace channel involves the calling the respective API provided the marketplace. To keep this simple, we are going to a mock a HTTP server that accepts the following HTTP request

POST /channels/UA/ranging HTTP/1.1
Host: localhost:3000
Content-Type: application/json
Authorization: Bearer top-secret!
Accept: */*
Content-Length: 79

	"ean" : "EAN_1",
	"sku" : "SKU_1"
	"ean" : "EAN_2",
	"sku" : "SKU_2"

To perform this action, let's add a tata-cliq API client that implements this fake request for ranging.

> mkdir src/wheel/marketplace/tata_cliq
> touch src/wheel/marketplace/tata_cliq/api.clj
; src/wheel/marketplace/tata_cliq/api.clj
(ns wheel.marketplace.tata-cliq.api
(:require [clj-http.client :as http]))

(defn ranging [{:keys [base-url bearer-token]} channel-id items]
(let [url (str base-url "/channels/" channel-id "/ranging")
auth-header (str "Bearer " bearer-token)]
(http/post url {:form-params items
:content-type :json
:headers {:authorization auth-header}})))

Then update the config.edn to store the channel settings (base-url & bearer-token) and expose it a via a function in config.clj.

; resources/config.edn
{:app ...
:settings {...
:channels {"UA" {:channel-name :tata-cliq
:base-url "http://localhost:3000"
:bearer-token "top-secret!"}
"UB" {:channel-name :tata-cliq
:base-url "http://localhost:3000"
:bearer-token "top-secret!"}}}}}
; src/wheel/infra/config.clj
; ...
(defn get-channel-cofig [channel-id]
(get-in root [:settings :channels channel-id]))

To perform the ranging in the marketplace(s) in response to a message from OMS, we need to define a multimethod process-ranging that dispatches based on the channel-name

; src/wheel/middleware/ranging.clj
; ...
(defmulti process-ranging (fn [{:keys [channel-name]}

(defmethod middleware/process ... )

Then rewrite the middleware/process multimethod implementation to iterate through each channel in the ranging message, and invoke the process-ranging method after getting their configuration.

; src/wheel/middleware/ranging.clj
(ns wheel.middleware.ranging
(:require ; ...
[middleware.infra.event :as event]))
; ...

(defmethod middleware/process :ranging [{:keys [id]
:as oms-msg} ranging-message]
(for [{:keys [channel-id]
:as ch-ranging-message} ranging-message]
(if-let [channel-config (config/get-channel-cofig channel-id)]
(process-ranging channel-config oms-msg ch-ranging-message)
(catch Throwable ex
(event/processing-failed ex id :ranging channel-id (:channel-name channel-config))))
(event/channel-not-found id :ranging channel-id))))

The channel-not-found function and processing-failed function overload are not available, so let's add them.

; src/wheel/middleware/event.clj
; ...
(defn processing-failed
; ... existing implementation
([ex parent-id message-type channel-id channel-name]
{:post [(s/assert ::event %)]}
(event :system/processing-failed
(assoc (ex->map ex) :message-type message-type)
:parent-id parent-id
:channel-id channel-id
:channel-name channel-name
:level :error)))
; ...
(defn channel-not-found [parent-id message-type channel-id]
{:pre [(s/assert uuid? parent-id)
(s/assert ::oms-message/type message-type)
(s/assert ::channel/id channel-id)]
:post [(s/assert ::event %)]}
(event :system/channel-not-found
{:channel-id channel-id
:message-type message-type}
:parent-id parent-id
:type :system
:level :error))

As we'll be adding the ranging/succeeded event, let's add the spec for this as well.

; src/wheel/middleware/event.clj
(s/def ::domain-event-name #{:ranging/succeeded})
; ...
(s/def ::ranged-item
(s/keys :req-un [::item/ean ::item/id]))
(s/def ::ranged-items (s/coll-of ::ranged-item :min-count 1))
(defmethod payload-type :ranging/succeeded [_]
(s/keys :req-un [::ranged-items]))

(defn ranging-succeeded [parent-id channel-id channel-name items]
{:pre [(s/assert uuid? parent-id)
(s/assert ::channel/id channel-id)
(s/assert ::channel/name channel-name)
(s/assert ::ranged-items items)]
:post [(s/assert ::event %)]}
(event :ranging/succeeded
{:ranged-items items}
:parent-id parent-id
:channel-id channel-id
:channel-name channel-name))

The final piece left is defining the tata-cliq implementation of the process-ranging multimethod.

> touch src/wheel/marketplace/tata_cliq/core.clj
; src/wheel/marketplace/tata_cliq/core.clj
(ns wheel.marketplace.tata-cliq.core
(:require [wheel.marketplace.tata-cliq.api :as tata-cliq]
[wheel.middleware.ranging :as ranging]
[wheel.oms.message :as oms-message]
[wheel.middleware.event :as event]
[clojure.spec.alpha :as s]
[clojure.set :as set]))

(defmethod ranging/process-ranging :tata-cliq
[{:keys [channel-name]
:as channel-config}
{:keys [id]
:as oms-msg}
{:keys [channel-id items]
:as channel-items}]
{:pre [(s/assert ::oms-message/oms-message oms-msg)
(s/assert ::ranging/channel-items channel-items)]}
(tata-cliq/ranging channel-config channel-id
(map #(set/rename-keys % {:id :sku}) items)) ; <1>
(event/ranging-succeeded id channel-id channel-name items))

<1> The item in tata-cliq API doesn't have id instead it uses sku.

; src/wheel/infra/core.clj
(ns wheel.infra.core
(:require ; ...
[wheel.marketplace.tata-cliq.core :as tata-cliq]))
; ...

Setting Up Mock Server.

To set up the Mock Server for tata-cliq we are going to use Mockon. The fake configuration is going to return HTTP Response 200 for the channel-id UA and 500 for the channel-id UB. This Mockon setup is available in this gist. You can import it in the Mockon and start the Mockon server.

With this mock server, if we do a test drive of the implementation, we'll get the following output in the Slack.

<EXTNChannelItem ChannelID="UC" EAN="EAN_1" ItemID="SKU1" RangeFlag="Y"/>

If we try with the channel id UB, we'll get the processing failed exception.

<EXTNChannelItem ChannelID="UB" EAN="EAN_1" ItemID="SKU1" RangeFlag="Y"/>

For the channel id UA, we'll get the ranging succeeded as expected in the standard output log.

<EXTNChannelItem ChannelID="UA" EAN="EAN_1" ItemID="SKU1" RangeFlag="Y"/>
"payload": {
"ranged-items": [
"ean": "EAN_1",
"id": "SKU1"
"type": "ranging/succeeded"
"name": "ranging/succeeded",
"type": "domain",
"channel-name": "tata-cliq",
"level": "info",
"id": "f01e987b-bc72-41e9-9376-b362c3509273",
"parent-id": "8c22c6c1-9e00-463e-83bb-b2a77ab135a1",
"channel-id": "UA",
"timestamp": "2019-10-18T00:19:46.192+05:30"

Fixing DB Appender

In the database appender that we added earlier, we need to modify to support all the event types, and we also need to persist the event payload. To do it, let's add the database migration script.

> touch resources/db/migration/V201910180025__alter_event.sql
-- resources/db/migration/V201910180025__alter_event.sql
CREATE TYPE event_type AS ENUM ('domain', 'oms', 'system');

ALTER TABLE event ADD COLUMN type event_type NOT NULL DEFAULT 'domain';

The next step is adding the event_type and the jsonb type in the Toucan setup and use them in the event model definition.

; src/wheel/infra/database.clj
(ns wheel.infra.database
(:require ;...
[cheshire.core :as json]))
; ...
(defn- to-pg-jsonb [value]
(doto (PGobject.)
(.setType "jsonb")
(.setValue (json/generate-string value))))

(defn- configure-toucan []
(models/add-type! :event-type
:in (pg-object-fn "event_type")
:out keyword)
(models/add-type! :jsonb
:in to-pg-jsonb
:out #(json/parse-string (.getValue %) true)))
; src/wheel/model/event.clj
; ...
(models/defmodel Event :event
(types [_]
{; ...
:type :event-type
:payload :jsonb}))
; ...

Then rewrite the create! event function as below

; src/wheel/model/event.clj
; ...
(defn create! [new-event]
{:pre [(s/assert ::event/event new-event)]}
(db/insert! Event
(update new-event :timestamp timestamp->offset-date-time)))

Finally, rewrite the append-to-db function in the database appender to log all the event types.

; src/wheel/infra/log_appender/database.clj
; ...
(defn- append-to-db [{:keys [msg_]}]
(let [evnt (read-string (force msg_))]
(event/create! evnt)))
; ...

To test these changes, run the migration script and reset the app from the REPL

user=> (migrate-database)
{:stopped ["#'wheel.infra.database/datasource"]}
user=> (reset)
{:started [...]}

Then put the valid XML ranging message of channel UA in IBM-MQ, we should be able to see the new events in the database.

That's it!


Thanks for reading the whole article. I believe you'd have got a high-level idea of our project design and implementation. Feel free to drop a comment if you'd like to discuss further!

In the next blog post, we are going to implement the other side of the communication. Marketplace to OMS in which we'll be implementing cron jobs that fetch the information from the marketplace and relay it to the OMS. Stay tuned!

The source code associated with this part is available on this GitHub repository.

Did the content capture your interest? Stay in the loop by subscribing to the RSS feed and staying informed!