Camel is an open source integration framework that empowers you to quickly and easily integrate various systems consuming or producing data.
This library adds a thin layer on top of Java Apache Camel and provides a more idiomatic experience of using Apache Camel in the Clojure ecosystem, without changing the original functionality.
Include in your project.clj
(require [clj-camel.core :as c]
         [clj-camel.util :as cu])Simple route
(c/route-builder (c/from "direct:test")
                 (c/route-id "test-route")
                 (c/set-body (c/constant "test-body"))
                 (c/log "log: ${body}")
                 (c/to "direct:result"))Filter (original doc)
(c/route-builder (c/from "direct:test")
                 (c/route-id "test-route")
                 (c/to "http://test-http")
                 (c/filter (c/predicate (comp pos? :body))
                           (c/log "Filtered ... ${body}")
                           (c/to "direct:result"))
                 (c/process (fn [_] {:body "after filter"})))Choice (original doc)
(c/route-builder (c/choice (c/when (c/predicate (comp pos? :body))
                                   (c/log "when 1")
                                   (c/process some-processor))
                           (c/when (c/predicate (comp neg? :body))
                                   (c/log "when 2")
                                   (c/process some-processor))
                           (c/otherwise
                             (c/log "otherwise")
                             (c/process some-processor)))
                 (c/log "after choice"))Split (original doc)
(c/route-builder (c/from "direct:test")
                 (c/route-id "test-route")
                 (c/process processor1)
                 (c/to "http://test-http")
                 (c/split (c/json-path "$.data.*") {:agg-strategy        c/grouped-exchange-strategy
                                                    :streaming           true
                                                    :parallel-processing true}
                          (c/process (fn [_] {}))
                          (c/filter (c/predicate (comp pos? :reserved-today :body))
                                    (c/log "Filtered ... ${body}")
                                    (c/to "direct:result")))
                 (c/process (fn [_] {:body "after"})))Aggregate (original doc)
(c/route-builder (c/from "direct:test")
                 (c/set-body (c/constant "test"))
                 (c/aggregate (c/constant 1) c/grouped-body-strategy
                              {:completion-size      1000
                               :completion-timeout   1000
                               :completion-predicate (c/predicate (fn [_] true))}
                              (c/to "direct:result"))
                 (c/log "after aggregating"))Caching (original doc)
(c/route-builder (c/from "direct:test")
                 (c/route-id "test-route")
                 (c/set-body (c/constant "key"))
                 (c/log "key requested: ${body}")
                 (c/memoize (cu/create-jcache-expiration-policy "cache-name" 60)
                            (c/set-body (c/constant "value"))
                            (c/log "Populate cache with ${body}"))
                 (c/log "key value result: ${body}")
                 (c/to "direct:result"))Throttling (original doc)
(c/route-builder (c/from "direct:test")
                 (c/set-body (c/constant "test"))
                 (c/throttle 20 {:async-delayed      false
                                 :reject-execution   false
                                 :time-period-millis 10000})
                 (c/log "after throttling")
                 (c/to "direct:result"))Try/Catch/Finally (original doc)
(c/route-builder (c/from "direct:test")
                 (c/route-id "test-route")
                 (c/do-try (c/to "http://test-http")
                           (c/do-catch Exception
                                       (c/log "handle exception")
                                       (c/log "handle exception2"))
                           (c/do-finally
                             (c/log "finally")
                             (c/log "finally2")))
                 (c/log "after do-try"))MDC from headers UOW
The following will populate MDC context with name-of-mdc-field with value of incoming exchange header
field name-of-header-field
(if incoming exchange has a respective header)
Example of log:
{
  "message": "example message",
  "mdc": {
    "name-of-mdc-field": "test-value",
    "camel.breadcrumbId": "xxx",
    "camel.routeId": "yyy"
  }
}(c/set-customer-uow-with-mdc-from-headers context {"name-of-header-filed" "name-of-mdc-field"})GCP Pub-sub attributes propagation
Specified pub-sub message attributes will be added to exchange header if exist
(c/set-pubsub-attributes-propagation context {"pubsub-attribute-name" "name-of-header-field"})When configuring your logging appenders, contrary to the Camel documentation, you will need to add clj-camel.core as your logging namespace.
- authentication mechanism in google-pubsub-component was changed. serviceAccountKeybecome mandatory parameter in google pubsub endpoint string: https://camel.apache.org/components/3.11.x/google-pubsub-component.html#_authentication_configuration
- changed interface of Exchangeclass:getAllPropertiesshould be used instead ofgetPropertiesmethod
- apm-opentracing dependencies may need to be updated
- fasterxml/jackson dependencies may need to be updated