Clojure agents consuming from a queue

Posted by erikcw on Stack Overflow See other posts from Stack Overflow or by erikcw
Published on 2010-04-08T19:28:03Z Indexed on 2010/04/09 0:43 UTC
Read the original article Hit count: 430

Filed under:
|
|

I'm trying to figure out the best way to use agents to consume items from a Message Queue (Amazon SQS). Right now I have a function (process-queue-item) that grabs an items from the queue, and processes it.

I want to process these items concurrently, but I can't wrap my head around how to control the agents. Basically I want to keep all of the agents busy as much as possible without pulling to many items from the Queue and developing a backlog (I'll have this running on a couple of machines, so items need to be left in the queue until they are really needed).

Can anyone give me some pointers on improving my implementation?

(def active-agents (ref 0))

(defn process-queue-item [_]
  (dosync (alter active-agents inc))
  ;retrieve item from Message Queue (Amazon SQS) and process
  (dosync (alter active-agents dec)))

(defn -main []
  (def agents (for [x (range 20)] (agent x)))

  (loop [loop-count 0]

    (if (< @active-agents 20)
      (doseq [agent agents]
        (if (agent-errors agent)
          (clear-agent-errors agent))
        ;should skip this agent until later if it is still busy processing (not sure how)
        (send-off agent process-queue-item)))

    ;(apply await-for (* 10 1000) agents)
    (Thread/sleep  10000)
    (logging/info (str "ACTIVE AGENTS " @active-agents))
    (if (> 10 loop-count)
      (do (logging/info (str "done, let's cleanup " count))
       (doseq [agent agents]
         (if (agent-errors agent)
           (clear-agent-errors agent)))
       (apply await agents)
       (shutdown-agents))
      (recur (inc count)))))

© Stack Overflow or respective owner

Related posts about clojure

Related posts about concurrency