From 68d89cee2dc7c482d9029b2fd0721611322dcae6 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Tue, 14 Aug 2012 01:03:13 +0200 Subject: o Db access happen from its own thread. --- src/main/scala/main.scala | 159 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 117 insertions(+), 42 deletions(-) diff --git a/src/main/scala/main.scala b/src/main/scala/main.scala index e8e1d6d..f228949 100644 --- a/src/main/scala/main.scala +++ b/src/main/scala/main.scala @@ -12,6 +12,7 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import scala.collection.JavaConverters._ +/* object MyConnectionListener extends ConnectionListener { def closed(e: ConnectionEvent) { @@ -27,7 +28,7 @@ object MyConnectionListener extends ConnectionListener { } } -class MyMessageCountListener(f: IMAPFolder, deque: Deque[Long]) extends MessageCountListener { +class MyMessageCountListener(f: IMAPFolder, dao: Dao) extends MessageCountListener { def messagesAdded(e: MessageCountEvent) { println("messagesAdded: added count=" + e.getMessages.length) e.getMessages.foreach { message => @@ -44,7 +45,7 @@ class MyMessageCountListener(f: IMAPFolder, deque: Deque[Long]) extends MessageC } } } - +*/ /* object MyMessageChangedListener extends MessageChangedListener { def messageChanged(e: MessageChangedEvent) { @@ -55,6 +56,7 @@ object MyMessageChangedListener extends MessageChangedListener { } */ +/* object MyFolderListener extends FolderListener { def folderCreated(e: FolderEvent) { println("folder created") @@ -76,6 +78,7 @@ object MyStoreListener extends StoreListener { println("message: " + e.getMessage()) } } +*/ class Dao(c: Connection) extends Closeable { def close() { @@ -118,7 +121,78 @@ class Dao(c: Connection) extends Closeable { } } -object Service { +class DbThread(factory: () => Service) extends Thread with Closeable { + private val queue = new LinkedBlockingQueue[(IMAPFolder, Message)]() + + private var shouldRun = true + + def add(folder: IMAPFolder, message: Message) { + println("Adding message: " + message) + queue.add((folder, message)) + /* + queue.synchronized { + queue.add((folder, message)) + queue.notify() + } + */ + } + + def close() { + /* + queue.synchronized { + shouldRun = false + queue.notify() + this.interrupt() + } + */ + this.interrupt() + } + + def processQueue(service: Service) { + var item = queue.poll() + while(item != null) { + var (folder, message) = item + service.processMessage(folder, message) + item = queue.poll() + } + } + + override def run() { + var service = factory() + while(shouldRun) { + if(service == null) { + service = factory() + } + try { + /* + queue.synchronized { + queue.wait + } + */ + Option(queue.poll(1, TimeUnit.DAYS)) match { + case None => + case Some((folder, message)) => + println("Processing message: " + message) + service.processMessage(folder, message) + } + } + catch { + case e => + IOUtils.closeQuietly(service) + service = null + if(shouldRun) + e.printStackTrace(); + } + } + println("db thread stopping") + } +} + +class Service(dao: Dao) extends Closeable { + def close() { + IOUtils.closeQuietly(dao) + } + def headers(part: Part): (Array[String], Array[String]) = { var headers = part.getAllHeaders.asInstanceOf[java.util.Enumeration[Header]].asScala.toList var (names, values) = headers.unzip({ @@ -127,7 +201,24 @@ object Service { }) return (names.toArray, values.toArray) } - def processMail(dao: Dao, uid: Long, message: Message) { + + def processMessage(folder: IMAPFolder, message: Message) { + val uid = folder.getUID(message) + var ok = tx { dao => + println(new Date) + if(dao.countMail(uid) > 0) { + println("Message already imported: uid=" + uid) + } + else { + processMail(uid, message) + } + } + if(ok.isDefined && !message.getFlags().contains(Flags.Flag.DELETED)) { + message.setFlag(Flags.Flag.DELETED, true) + } + } + + def processMail(uid: Long, message: Message) { println("Importing message, uid=" + uid) var (names, values) = headers(message) @@ -183,9 +274,7 @@ object Service { } } } -} -class ConnectionHandler(f: IMAPFolder, dao: Dao) { def tx[T](f: Dao => T): Option[T] = { dao.begin() try { @@ -199,45 +288,31 @@ class ConnectionHandler(f: IMAPFolder, dao: Dao) { None } } +} +class ConnectionHandler(f: IMAPFolder, thread: DbThread) { def process() { - val deque = new LinkedBlockingDeque[Long]() - println("Folder: " + f.getFullName()) f.open(Folder.READ_WRITE) println("Folder has " + f.getMessageCount() + " messages, " + f.getNewMessageCount + " new and " + f.getDeletedMessageCount + " deleted.") // f.addMessageChangedListener(MyMessageChangedListener) - f.addMessageCountListener(new MyMessageCountListener(f, deque)) + f.addMessageCountListener(new MessageCountListener { + def messagesAdded(e: MessageCountEvent) { + e.getMessages.foreach { message => + thread.add(f, message) + } + } - f.getMessages().foreach(processMessage) + def messagesRemoved(e: MessageCountEvent) {} + }) + + f.getMessages().foreach(m => thread.add(f, m)) do { - println("IDLE") + println("Going idle...") f.idle(true); - println("Waiting for message..") - /* - val message = deque.poll - if(message != null) - processMessage(message) - */ } while(true) } - - def processMessage(message: Message) { - val uid = f.getUID(message) - var ok = tx { dao => - println(new Date) - if(dao.countMail(uid) > 0) { - println("Message already imported: uid=" + uid) - } - else { - Service.processMail(dao, uid, message) - } - } - if(ok.isDefined) { - message.setFlag(Flags.Flag.DELETED, true) - } - } } class ImapToDb(f: File) { @@ -304,8 +379,8 @@ class ImapToDb(f: File) { def connectToFolder() = { val store = session.getStore("imaps") store.connect(imapHost, imapUsername, imapPassword) - store.addConnectionListener(MyConnectionListener) - store.addStoreListener(MyStoreListener) + // store.addConnectionListener(MyConnectionListener) + // store.addStoreListener(MyStoreListener) // store.addFolderListener(MyFolderListener) val folder = store.getFolder("INBOX") // folder.addConnectionListener(MyConnectionListener) @@ -319,30 +394,30 @@ class ImapToDb(f: File) { } def connectToDb() = { + println("Connecting to db...") val c = DriverManager.getConnection(dbUrl, dbProps) c.setAutoCommit(false) - new Dao(c) + new Service(new Dao(c)) } + val thread = new DbThread(connectToDb) + thread.setDaemon(true) + thread.start() while(true) { var folder: IMAPFolder = null - var dao: Dao = null try { folder = connectToFolder() - dao = connectToDb() - new ConnectionHandler(folder, dao).process() + new ConnectionHandler(folder, thread).process() } catch { case e => System.out.println("Uncaught exception: " + e.getMessage) } finally { - try { folder.getStore.close() } catch { case _ => } - IOUtils.closeQuietly(dao) + IOUtils.closeQuietly(new Closeable { def close { folder.getStore.close() } } ) Thread.sleep(1000) } } } } - } object ImapToDb extends App { -- cgit v1.2.3