001  (ns clj-wamp.server
002    ^{:author "Christopher Martin"
003      :doc "Clojure implementation of the WebSocket Application Messaging Protocol"}
004    (:use [clojure.core.incubator :only [dissoc-in]]
005          [clojure.string :only [split trim lower-case]])
006    (:require [clojure.java.io :as io]
007              [org.httpkit.server :as httpkit]
008              [org.httpkit.timer :as timer]
009              [cheshire.core :as json]
010              [clojure.tools.logging :as log]
011              [clojure.data.codec.base64 :as base64])
012    (:import [org.httpkit.server AsyncChannel]
013             [javax.crypto Mac]
014             [javax.crypto.spec SecretKeySpec]))
015  
016  (declare send!)
017  
018  (def ^:const TYPE-ID-WELCOME     0) ; Server-to-client (Aux)
019  (def ^:const TYPE-ID-PREFIX      1) ; Client-to-server (Aux)
020  (def ^:const TYPE-ID-CALL        2) ; Client-to-server (RPC)
021  (def ^:const TYPE-ID-CALLRESULT  3) ; Server-to-client (RPC)
022  (def ^:const TYPE-ID-CALLERROR   4) ; Server-to-client (RPC)
023  (def ^:const TYPE-ID-SUBSCRIBE   5) ; Client-to-server (PubSub)
024  (def ^:const TYPE-ID-UNSUBSCRIBE 6) ; Client-to-server (PubSub)
025  (def ^:const TYPE-ID-PUBLISH     7) ; Client-to-server (PubSub)
026  (def ^:const TYPE-ID-EVENT       8) ; Server-to-client (PubSub)
027  
028  (def ^:const URI-WAMP-BASE            "http://api.wamp.ws/")
029  (def ^:const URI-WAMP-ERROR           (str URI-WAMP-BASE "error#"))
030  (def ^:const URI-WAMP-PROCEDURE       (str URI-WAMP-BASE "procedure#"))
031  (def ^:const URI-WAMP-CALL-AUTHREQ    (str URI-WAMP-PROCEDURE "authreq"))
032  (def ^:const URI-WAMP-CALL-AUTH       (str URI-WAMP-PROCEDURE "auth"))
033  (def ^:const URI-WAMP-TOPIC           (str URI-WAMP-BASE "topic#"))
034  (def ^:const URI-WAMP-ERROR-GENERIC   (str URI-WAMP-ERROR "generic"))
035  (def ^:const DESC-WAMP-ERROR-GENERIC  "generic error")
036  (def ^:const URI-WAMP-ERROR-INTERNAL  (str URI-WAMP-ERROR "internal"))
037  (def ^:const DESC-WAMP-ERROR-INTERNAL "internal error")
038  (def ^:const URI-WAMP-ERROR-NOTFOUND  (str URI-WAMP-ERROR "notfound"))
039  (def ^:const DESC-WAMP-ERROR-NOTFOUND "not found error")
040  (def ^:const DESC-WAMP-ERROR-NOAUTH   "unauthorized")
041  (def ^:const URI-WAMP-ERROR-NOAUTH    (str URI-WAMP-ERROR "unauthorized"))
042  
043  (def project-version "clj-wamp/1.0.0-rc1")
044  
045  (def max-sess-id (atom 0))
046  
047  (defn- next-sess-id []
048    (swap! max-sess-id inc))
049  
050  
051  ;; Client utils
052  
053  (def client-channels (ref {}))
054  (def client-prefixes (ref {}))
055  (def client-auth     (ref {}))
056  
057  (defn add-client
058    "Adds a websocket channel (or callback function) to a map of clients
059    and returns a unique session id."
060    [channel-or-fn]
061    (let [sess-id (str (System/currentTimeMillis) "-" (next-sess-id))]
062      (dosync (alter client-channels assoc sess-id channel-or-fn))
063      sess-id))
064  
065  (defn get-client-channel
066    "Returns the channel (or callback function) for a websocket client's
067    session id."
068    [sess-id]
069    (get @client-channels sess-id))
070  
071  (defn del-client
072    "Removes a websocket session from the map of clients."
073    [sess-id]
074    (dosync
075      (alter client-channels dissoc sess-id)
076      (alter client-prefixes dissoc sess-id)
077      (alter client-auth     dissoc sess-id)))
078  
079  (defn add-topic-prefix
080    "Adds a new CURI topic prefix for a websocket client."
081    [sess-id prefix uri]
082    (log/trace "New CURI Prefix [" sess-id "]" prefix uri)
083    (dosync
084      (alter client-prefixes assoc-in [sess-id prefix] uri)))
085  
086  (defn get-topic
087    "Returns the full topic URI for a prefix. If prefix does not exist,
088    returns the CURI passed in."
089    [sess-id curi]
090    (let [topic (split curi #":")
091          prefix (first topic)
092          suffix (second topic)]
093      (if-let [uri (get-in @client-prefixes [sess-id prefix])]
094        (str uri suffix)
095        curi)))
096  
097  (defn close-channel
098    ([sess-id]
099      (close-channel sess-id 1002))
100    ([sess-id code]
101      (when-let [channel (get-client-channel sess-id)]
102        (if (fn? channel)
103          (httpkit/close channel) ; for unit testing
104          (.serverClose channel code)) ; TODO thread-safe? (locking AsyncChannel ...) ?
105        (log/trace "Channel closed" code))))
106  
107  ;; Topic utils
108  
109  (def client-topics (ref {}))
110  (def topic-clients (ref {}))
111  
112  (defn topic-subscribe
113    "Subscribes a websocket session to a topic."
114    [topic sess-id]
115    (dosync
116      (alter topic-clients assoc-in [topic sess-id] true)
117      (alter client-topics assoc-in [sess-id topic] true)))
118  
119  (defn topic-unsubscribe
120    "Unsubscribes a websocket session from a topic."
121    [topic sess-id]
122    (dosync
123      (alter topic-clients dissoc-in [topic sess-id])
124      (alter client-topics dissoc-in [sess-id topic])))
125  
126  (defn- topic-send!
127    "Sends an event to *all* websocket clients subscribed to a topic."
128    [topic & data]
129    (dosync
130      (doseq [[sess-id _] (@topic-clients topic)]
131        (apply send! sess-id data))))
132  
133  (defn- topic-broadcast!
134    "Send an event to websocket clients subscribed to a topic,
135    except those excluded."
136    [topic excludes & data]
137    (let [excludes (if (sequential? excludes) excludes [excludes])]
138      (dosync
139        (doseq [[sess-id _] (@topic-clients topic)]
140          (if (not-any? #{sess-id} excludes)
141            (apply send! sess-id data))))))
142  
143  (defn- topic-emit!
144    "Sends an event to a specific list of websocket clients subscribed
145    to a topic."
146    [topic includes & data]
147    (let [includes (if (sequential? includes) includes [includes])]
148      (dosync
149        (doseq [[sess-id _] (@topic-clients topic)]
150          (if (some #{sess-id} includes)
151            (apply send! sess-id data))))))
152  
153  (defn get-topic-clients [topic]
154    "Returns all client session ids within a topic."
155    (if-let [clients (@topic-clients topic)]
156      (keys clients)))
157  
158  ;; WAMP websocket send! utils
159  
160  (defn- send!
161    "Sends data to a websocket client."
162    [sess-id & data]
163    (dosync
164      (let [channel-or-fn (get-client-channel sess-id)
165            json-data     (json/encode data {:escape-non-ascii true})]
166        (log/trace "Sending data:" data)
167        (if (fn? channel-or-fn) ; application callback?
168          (channel-or-fn data)
169          (when channel-or-fn
170            (httpkit/send! channel-or-fn json-data))))))
171  
172  (defn send-welcome!
173    "Sends a WAMP welcome message to a websocket client.
174    [ TYPE_ID_WELCOME , sessionId , protocolVersion, serverIdent ]"
175    ([sess-id]
176      (send-welcome! sess-id 1 project-version))
177    ([sess-id protocol-ver server-ident]
178      (send! sess-id TYPE-ID-WELCOME sess-id protocol-ver server-ident)))
179  
180  (defn send-call-result!
181    "Sends a WAMP call result message to a websocket client.
182    [ TYPE_ID_CALLRESULT , callID , result ]"
183    [sess-id call-id result]
184    (send! sess-id TYPE-ID-CALLRESULT call-id result))
185  
186  (defn send-call-error!
187    "Sends a WAMP call error message to a websocket client.
188    [ TYPE_ID_CALLERROR , callID , errorURI , errorDesc [, errorDetails] ]"
189    ([sess-id call-id err-uri err-desc]
190      (send-call-error! sess-id call-id err-uri err-desc nil))
191    ([sess-id call-id err-uri err-desc err-details]
192      (if (nil? err-details)
193        (send! sess-id TYPE-ID-CALLERROR call-id err-uri err-desc)
194        (send! sess-id TYPE-ID-CALLERROR call-id err-uri err-desc err-details))))
195  
196  (defn send-event!
197    "Sends an event message to all clients in topic.
198    [ TYPE_ID_EVENT , topicURI , event ]"
199    [topic event]
200    (topic-send! topic TYPE-ID-EVENT topic event))
201  
202  (defn broadcast-event!
203    "Sends an event message to all clients in a topic but those excluded."
204    [topic event excludes]
205    (topic-broadcast! topic excludes TYPE-ID-EVENT topic event))
206  
207  (defn emit-event!
208    "Sends an event message to specific clients in a topic"
209    [topic event includes]
210      (topic-emit! topic includes TYPE-ID-EVENT topic event))
211  
212  
213  ;; WAMP callbacks
214  
215  (defn- callback-rewrite
216    "Utility for rewriting params with an optional callback fn."
217    [callback & params]
218    (if (fn? callback)
219      (apply callback params)
220      (when (or (nil? callback) (true? callback))
221        params)))
222  
223  (defn- on-close
224    "Clean up clients and topics upon disconnect."
225    [sess-id close-cb unsub-cb]
226    (fn [status]
227      (dosync
228        (when (fn? close-cb) (close-cb sess-id status))
229        (if-let [sess-topics (@client-topics sess-id)]
230          (doseq [[topic _] sess-topics]
231            (topic-unsubscribe topic sess-id)
232            (when (fn? unsub-cb) (unsub-cb sess-id topic))))
233        (del-client sess-id))))
234  
235  (defn- call-success
236    [sess-id topic call-id result on-after-cb]
237    (let [cb-params [sess-id topic call-id result]
238          cb-params (apply callback-rewrite on-after-cb cb-params)
239          [sess-id topic call-id result] cb-params]
240      (send-call-result! sess-id call-id result)))
241  
242  (defn- call-error
243    [sess-id topic call-id error on-after-cb]
244    (let [cb-params [sess-id topic call-id error]
245          cb-params (apply callback-rewrite on-after-cb cb-params)
246          [sess-id topic call-id error] cb-params
247          {err-uri :uri err-msg :message err-desc :description kill :kill} error
248          err-uri (if (nil? err-uri) URI-WAMP-ERROR-GENERIC err-uri)
249          err-msg (if (nil? err-msg) DESC-WAMP-ERROR-GENERIC err-msg)]
250      (send-call-error! sess-id call-id err-uri err-msg err-desc)
251      (when kill (close-channel sess-id))))
252  
253  ; Optional session id for rpc calls
254  (def ^:dynamic *call-sess-id* nil)
255  
256  ;; WAMP-CRA Authentication
257  
258  (defn hmac-sha-256
259    "Generates a HMAC SHA256 hash."
260    [^String key ^String data]
261    (let [hmac-key (SecretKeySpec. (.getBytes key) "HmacSHA256")
262          hmac     (doto (Mac/getInstance "HmacSHA256") (.init hmac-key))
263          result   (.doFinal hmac (.getBytes data))]
264      (String. (base64/encode result) "UTF-8")))
265  
266  (defn auth-challenge
267    "Generates a challenge hash used by the client to sign the secret."
268    [sess-id auth-key auth-secret]
269    (let [hmac-key (str auth-secret "-" (System/currentTimeMillis) "-" sess-id)]
270      (hmac-sha-256 hmac-key auth-key)))
271  
272  (defn- auth-sig-match?
273    "Check whether the client signature matches the server's signature."
274    [sess-id signature]
275    (if-let [auth-sig (get-in @client-auth [sess-id :sig])]
276      (= signature auth-sig)))
277  
278  (defn- add-client-auth-sig
279    "Stores the authorization signature on the server-side for later
280    comparison with the client."
281    [sess-id auth-key auth-secret challenge]
282    (let [sig (hmac-sha-256 challenge auth-secret)]
283      (dosync
284        (alter client-auth assoc sess-id {:sig   sig
285                                          :key   auth-key
286                                          :auth? false}))
287      sig))
288  
289  (defn- add-client-auth-anon
290    "Stores anonymous client metadata with the session."
291    [sess-id]
292    (dosync (alter client-auth assoc sess-id {:key :anon :auth? false})))
293  
294  (defn client-auth-requested?
295    "Checks if the authreq call has already occurred."
296    [sess-id]
297    (not (nil? (get-in @client-auth [sess-id :key]))))
298  
299  (defn client-authenticated?
300    "Checks if authentication has occurred."
301    [sess-id]
302    (get-in @client-auth [sess-id :auth?]))
303  
304  (defn authorized?
305    "Checks if the session is authorized for a message type and topic."
306    [sess-id type topic perm-cb]
307    (if-let [auth-key (get-in @client-auth [sess-id :key])]
308      (let [perms (perm-cb sess-id auth-key)]
309        (get-in perms [type topic]))))
310  
311  (defn- create-call-authreq
312    "Creates a callback for the authreq RPC call."
313    [allow-anon? secret-cb]
314    (fn [& [auth-key extra]]
315      (dosync
316        (if (client-authenticated? *call-sess-id*)
317          {:error {:uri (str URI-WAMP-ERROR "already-authenticated")
318                   :message "already authenticated"}}
319          (if (client-auth-requested? *call-sess-id*)
320            {:error {:uri (str URI-WAMP-ERROR "authentication-already-requested")
321                     :message "authentication request already issued - authentication pending"}}
322  
323            (if (nil? auth-key)
324              ; Allow anonymous auth?
325              (if-not allow-anon?
326                {:error {:uri (str URI-WAMP-ERROR "anonymous-auth-forbidden")
327                         :message "authentication as anonymous is forbidden"}}
328                (do
329                  (add-client-auth-anon *call-sess-id*)
330                  nil)) ; return nil
331              ; Non-anonymous auth
332              (if-let [auth-secret (secret-cb *call-sess-id* auth-key extra)]
333                (let [challenge (auth-challenge *call-sess-id* auth-key auth-secret)]
334                  (add-client-auth-sig *call-sess-id* auth-key auth-secret challenge)
335                  challenge) ; return the challenge
336                {:error {:uri (str URI-WAMP-ERROR "no-such-authkey")
337                         :message "authentication key does not exist"}})))))))
338  
339  (defn- create-call-auth
340    "Creates a callback for the auth RPC call."
341    [perm-cb]
342    (fn [& [signature]]
343      (dosync
344        (if (client-authenticated? *call-sess-id*)
345          {:error {:uri (str URI-WAMP-ERROR "already-authenticated")
346                   :message "already authenticated"}}
347          (if (not (client-auth-requested? *call-sess-id*))
348            {:error {:uri (str URI-WAMP-ERROR "no-authentication-requested")
349                     :message "no authentication previously requested"}}
350            (let [auth-key (get-in @client-auth [*call-sess-id* :key])]
351              (if (or (= :anon auth-key) (auth-sig-match? *call-sess-id* signature))
352                (do
353                  (alter client-auth assoc-in [*call-sess-id* :auth?] true)
354                  (perm-cb *call-sess-id* auth-key))
355                (do
356                  ; remove previous auth data, must request and authenticate again
357                  (alter client-auth dissoc *call-sess-id*)
358                  {:error {:uri (str URI-WAMP-ERROR "invalid-signature")
359                           :message "signature for authentication request is invalid"}}))))))))
360  
361  (defn- init-cr-auth
362    "Initializes the authorization RPC calls (if configured)."
363    [callbacks]
364    (if-let [auth-cbs (callbacks :on-auth)]
365      (let [allow-anon? (auth-cbs :allow-anon?)
366            secret-cb   (auth-cbs :secret)
367            perm-cb     (auth-cbs :permissions)]
368        (merge-with merge callbacks
369          {:on-call {URI-WAMP-CALL-AUTHREQ (create-call-authreq allow-anon? secret-cb)
370                     URI-WAMP-CALL-AUTH    (create-call-auth perm-cb)}}))
371      callbacks))
372  
373  (defn- auth-timeout
374    "Closes the session if the client has not authenticated."
375    [sess-id]
376    (when-not (client-authenticated? sess-id)
377      (close-channel sess-id)))
378  
379  (defn- init-auth-timer
380    "Starts a timer to ensure authentication, else the session is closed."
381    [callbacks sess-id]
382    (when-let [auth-cbs (callbacks :on-auth)]
383      (let [timeout-ms (auth-cbs :timeout 20000)
384            task       (timer/schedule-task timeout-ms (auth-timeout sess-id))]
385        task)))
386  
387  ;; WAMP PubSub/RPC callbacks
388  
389  (defn- on-call
390    "Handle WAMP call (RPC) messages"
391    [callbacks sess-id topic call-id & call-params]
392    (if-let [rpc-cb (callbacks topic)]
393      (try
394        (let [cb-params [sess-id topic call-id call-params]
395              cb-params (apply callback-rewrite (callbacks :on-before) cb-params)
396              [sess-id topic call-id call-params] cb-params
397              rpc-result (binding [*call-sess-id* sess-id]  ; bind optional sess-id
398                           (apply rpc-cb call-params))      ; use fn's own arg signature
399              error      (:error  rpc-result)
400              result     (:result rpc-result)]
401          (if (and (nil? error) (nil? result))
402            ; No map with result or error? Assume successful rpc-result as-is
403            (call-success sess-id topic call-id rpc-result (callbacks :on-after-success))
404            (if (nil? error)
405              (call-success sess-id topic call-id result (callbacks :on-after-success))
406              (call-error   sess-id topic call-id error  (callbacks :on-after-error)))))
407  
408        (catch Exception e
409          (call-error sess-id topic call-id
410            {:uri URI-WAMP-ERROR-INTERNAL
411             :message DESC-WAMP-ERROR-INTERNAL
412             :description (.getMessage e)}
413            (callbacks :on-after-error))
414          (log/error "RPC Exception:" topic call-params e)))
415  
416      (call-error sess-id topic call-id
417        {:uri URI-WAMP-ERROR-NOTFOUND
418         :message DESC-WAMP-ERROR-NOTFOUND}
419        (callbacks :on-after-error))))
420  
421  (defn- map-key-or-prefix
422    "Finds a map value by key or lookup by string key prefix (ending with *)."
423    [m k]
424    (if-let [v (m k)] v
425      (some #(when (not (nil? %)) %)
426        (for [[mk mv] m]
427          (when (and (not (keyword? mk)) (not (false? mv))
428                  (= \* (last mk))
429                  (= (take (dec (count mk)) k) (butlast mk)))
430            mv)))))
431  
432  (defn- on-subscribe
433    [callbacks sess-id topic]
434    (dosync
435      (when (nil? (get-in @topic-clients [topic sess-id]))
436        (when-let [topic-cb (map-key-or-prefix callbacks topic)]
437          (when (or (true? topic-cb) (topic-cb sess-id topic))
438            (let [on-after-cb (callbacks :on-after)]
439              (topic-subscribe topic sess-id)
440              (when (fn? on-after-cb)
441                (on-after-cb sess-id topic))))))))
442  
443  (defn- get-publish-exclude [sess-id exclude]
444    (if (= Boolean (type exclude))
445      (when (true? exclude) [sess-id])
446      exclude))
447  
448  (defn- on-publish
449    "Handles WAMP publish messages, sending event messages back out
450    to clients subscribed to the topic.
451    [ TYPE_ID_PUBLISH , topicURI , event [, exclude [, eligible ]]"
452    ([callbacks sess-id topic event]
453      (on-publish callbacks sess-id topic event false nil))
454    ([callbacks sess-id topic event exclude]
455      (on-publish callbacks sess-id topic event exclude nil))
456    ([callbacks sess-id topic event exclude eligible]
457      (when-let [pub-cb (map-key-or-prefix callbacks topic)]
458        (let [cb-params [sess-id topic event exclude eligible]
459              cb-params (apply callback-rewrite pub-cb cb-params)
460              on-after-cb (callbacks :on-after)]
461          (when (sequential? cb-params)
462            (let [[sess-id topic event exclude eligible] cb-params
463                  exclude (get-publish-exclude sess-id exclude)]
464              (if-not (nil? eligible)
465                (emit-event! topic event eligible)
466                (broadcast-event! topic event exclude))
467              (when (fn? on-after-cb)
468                (on-after-cb sess-id topic event exclude eligible))))))))
469  
470  (defn- on-message
471    "Handles all http-kit messages. parses the incoming data as json
472    and finds the appropriate wamp callback."
473    [sess-id callbacks]
474    (fn [data]
475      (log/trace "Data received:" data)
476      (let [[msg-type & msg-params] (try (json/decode data)
477                                      (catch com.fasterxml.jackson.core.JsonParseException ex
478                                        [nil nil]))
479            on-call-cbs  (callbacks :on-call)
480            on-sub-cbs   (callbacks :on-subscribe)
481            on-unsub-cb  (callbacks :on-unsubscribe)
482            on-pub-cbs   (callbacks :on-publish)
483            perm-cb      (get-in callbacks [:on-auth :permissions])]
484        (case msg-type
485  
486          1 ;TYPE-ID-PREFIX
487          (apply add-topic-prefix sess-id msg-params)
488  
489          2 ;TYPE-ID-CALL
490          (if (map? on-call-cbs)
491            (let [[call-id topic-uri & call-params] msg-params
492                  topic (get-topic sess-id topic-uri)]
493              (if (or (nil? perm-cb)
494                      (= URI-WAMP-CALL-AUTHREQ topic)
495                      (= URI-WAMP-CALL-AUTH topic)
496                      (authorized? sess-id :rpc topic perm-cb))
497                (apply on-call on-call-cbs sess-id topic call-id call-params)
498                (call-error sess-id topic call-id
499                  {:uri URI-WAMP-ERROR-NOAUTH :message DESC-WAMP-ERROR-NOAUTH}
500                  (on-call-cbs :on-after-error)))))
501  
502          5 ;TYPE-ID-SUBSCRIBE
503          (let [topic (get-topic sess-id (first msg-params))]
504            (if (or (nil? perm-cb) (authorized? sess-id :subscribe topic perm-cb))
505              (on-subscribe on-sub-cbs sess-id topic)))
506  
507          6 ;TYPE-ID-UNSUBSCRIBE
508          (let [topic (get-topic sess-id (first msg-params))]
509            (dosync
510              (when (true? (get-in @topic-clients [topic sess-id]))
511                (topic-unsubscribe topic sess-id)
512                (when (fn? on-unsub-cb) (on-unsub-cb sess-id topic)))))
513  
514          7 ;TYPE-ID-PUBLISH
515          (let [[topic-uri event & pub-args] msg-params
516                topic (get-topic sess-id topic-uri)]
517            (if (or (nil? perm-cb) (authorized? sess-id :publish topic perm-cb))
518              (apply on-publish on-pub-cbs sess-id topic event pub-args)))
519  
520          ; default: Unknown message type
521          (log/warn "Unknown message type" data)))))
522  
523  
524  (defn http-kit-handler
525    "Sets up the necessary http-kit websocket event handlers
526    for use with the WAMP sub-protocol. Returns a WAMP client session id.
527  
528    Example usage:
529  
530      (http-kit/with-channel req channel
531        (if-not (:websocket? req)
532          (http-kit/close channel)
533          (http-kit-handler channel
534            {:on-open        on-open-fn
535             :on-close       on-close-fn
536  
537             :on-auth        {:allow-anon?     false         ; allow anonymous authentication?
538                              :timeout         20000         ; default is 20 secs
539                              :secret          auth-secret-fn
540                              :permissions     auth-permissions-fn}
541  
542             :on-call        {(rpc-url \"add\")      +         ; map topics to rpc functions
543                              (rpc-url \"echo\")     identity
544                              :on-before           on-before-call-fn
545                              :on-after-error      on-after-call-error-fn
546                              :on-after-success    on-after-call-success-fn}
547  
548             :on-subscribe   {(evt-url \"chat\")     on-subscribe-fn? ; allowed to subscribe?
549                              (evt-url \"prefix*\")  true             ; match topics by prefix
550                              (evt-url \"sub-only\") true             ; implicitly allowed
551                              (evt-url \"pub-only\") false            ; subscription is denied
552                              :on-after            on-after-subscribe-fn};
553  
554             :on-publish     {(evt-url \"chat\")     on-publish-fn   ; custom event broker
555                              (evt-url \"prefix*\")  true            ; pass events through as-is
556                              (evt-url \"sub-only\") false           ; publishing is denied
557                              (evt-url \"pub-only\") true
558                              :on-after            on-after-publish-fn}
559  
560             :on-unsubscribe on-unsubscribe-fn})))
561  
562    Callback signatures:
563  
564      (on-open-fn sess-id)
565      (on-close-fn sess-id status)
566  
567      (auth-secret-fn sess-id auth-key auth-extra)
568        Provide the authentication secret for the key (ie. username) and
569        (optionally) extra information from the client. Return nil if the key
570        does not exist.
571  
572      (auth-permissions-fn sess-id auth-key)
573        Returns a map of permissions the session is granted when the authentication
574        succeeds for the given key.
575  
576        The permission map should be comprised of the topics that are allowed
577        for each category:
578  
579          {:rpc       {\"http://example/rpc#call\"    true}
580           :subscribe {\"http://example/event#allow\" true
581                       \"http://example/event#deny\"  false}
582           :publish   {\"http://example/event#allow\" true}}
583  
584      (rpc-call ...)
585        Can have any signature. The parameters received from the client will be applied as-is.
586        The client session is also available in the bound *call-sess-id* var.
587        The function may return a value as is, or in a result map: {:result \"my result\"},
588        or as an error map: {:error {:uri \"http://example.com/error#give-error\"
589                                     :message \"Test error\"
590                                     :description \"Test error description\"
591                                     :kill false}} ; true will close the connection after send
592  
593      (on-before-call-fn sess-id topic call-id call-params)
594        To allow call, return params as vector: [sess-id topic call-id call-params]
595        To deny, return nil/false.
596  
597      (on-after-call-error-fn sess-id topic call-id error)
598        Return params as vector: [sess-id topic call-id error]
599  
600      (on-after-call-success-fn sess-id topic call-id result)
601        Return params as vector: [sess-id topic call-id result]
602  
603      (on-subscribe-fn? sess-id topic)
604        Return true to allow client to subscribe, false to deny.
605  
606      (on-after-subscribe-fn sess-id topic)
607        No return values required.
608  
609      (on-publish-fn sess-id topic event exclude eligible)
610        To allow publish, return params as vector: [sess-id topic event exclude eligible]
611        To deny, return nil/false.
612  
613      (on-after-publish-fn sess-id topic event exclude eligible)
614        No return values required.
615  
616      (on-unsubscribe-fn sess-id topic)
617        No return values required."
618    [channel callbacks-map]
619    (let [callbacks-map (init-cr-auth callbacks-map)
620          cb-on-open    (callbacks-map :on-open)
621          sess-id       (add-client channel)]
622      (httpkit/on-close channel   (on-close sess-id
623                                    (callbacks-map :on-close)
624                                    (callbacks-map :on-unsubscribe)))
625      (httpkit/on-receive channel (on-message sess-id callbacks-map))
626      (send-welcome! sess-id)
627      (when (fn? cb-on-open) (cb-on-open sess-id))
628      (init-auth-timer callbacks-map sess-id)
629      sess-id))
630  
631  
632  (defn origin-match?
633    "Compares a regular expression against the Origin: header.
634    Used to help protect against CSRF, but do not depend on just
635    this check. Best to use a server-generated CSRF token for comparison."
636    [origin-re req]
637    (if-let [req-origin (get-in req [:headers "origin"])]
638      (re-matches origin-re req-origin)))
639  
640  (defn subprotocol?
641    "Checks if a protocol string exists in the Sec-WebSocket-Protocol
642    list header."
643    [proto req]
644    (if-let [protocols (get-in req [:headers "sec-websocket-protocol"])]
645      (some #{proto}
646        (map #(lower-case (trim %))
647          (split protocols #",")))))
648  
649  (defmacro with-channel-validation
650    "Replaces HTTP Kit with-channel macro to do extra validation
651    for the wamp subprotocol and allowed origin URLs.
652  
653    Example usage:
654  
655      (defn my-wamp-handler [request]
656        (wamp/with-channel-validation request channel #\"https?://myhost\"
657          (wamp/http-kit-handler channel { ... })))
658  
659    See org.httpkit.server for more information."
660    [request ch-name origin-re & body]
661    `(let [~ch-name (:async-channel ~request)]
662       (if (:websocket? ~request)
663         (if-let [key# (get-in ~request [:headers "sec-websocket-key"])]
664           (if (origin-match? ~origin-re ~request)
665             (if (subprotocol? "wamp" ~request)
666               (do
667                 (.sendHandshake ~(with-meta ch-name {:tag `AsyncChannel})
668                   {"Upgrade"                "websocket"
669                    "Connection"             "Upgrade"
670                    "Sec-WebSocket-Accept"   (httpkit/accept key#)
671                    "Sec-WebSocket-Protocol" "wamp"})
672                 [email protected]
673                 {:body ~ch-name})
674               {:status 400 :body "missing or bad WebSocket-Protocol"})
675             {:status 400 :body "missing or bad WebSocket-Origin"})
676           {:status 400 :body "missing or bad WebSocket-Key"})
677         {:status 400 :body "not websocket protocol"})))