summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2012-08-14 01:03:13 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2012-08-14 01:03:13 +0200
commit68d89cee2dc7c482d9029b2fd0721611322dcae6 (patch)
tree8da626e0a8ab515968e997e673d8bd960a1be80b
parentd5a8420694bc9f5b269f46097829d3178e116923 (diff)
downloadimap-to-db-68d89cee2dc7c482d9029b2fd0721611322dcae6.tar.gz
imap-to-db-68d89cee2dc7c482d9029b2fd0721611322dcae6.tar.bz2
imap-to-db-68d89cee2dc7c482d9029b2fd0721611322dcae6.tar.xz
imap-to-db-68d89cee2dc7c482d9029b2fd0721611322dcae6.zip
o Db access happen from its own thread.HEADmaster
-rw-r--r--src/main/scala/main.scala159
1 files changed, 117 insertions, 42 deletions
diff --git a/src/main/scala/main.scala b/src/main/scala/main.scala
index e8e1d6d..f228949 100644
--- a/src/main/scala/main.scala
+++ b/src/main/scala/main.scala
@@ -12,6 +12,7 @@ import org.slf4j.Logger
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
+/*
object MyConnectionListener extends ConnectionListener {
def closed(e: ConnectionEvent) {
@@ -27,7 +28,7 @@ object MyConnectionListener extends ConnectionListener {
}
}
-class MyMessageCountListener(f: IMAPFolder, deque: Deque[Long]) extends MessageCountListener {
+class MyMessageCountListener(f: IMAPFolder, dao: Dao) extends MessageCountListener {
def messagesAdded(e: MessageCountEvent) {
println("messagesAdded: added count=" + e.getMessages.length)
e.getMessages.foreach { message =>
@@ -44,7 +45,7 @@ class MyMessageCountListener(f: IMAPFolder, deque: Deque[Long]) extends MessageC
}
}
}
-
+*/
/*
object MyMessageChangedListener extends MessageChangedListener {
def messageChanged(e: MessageChangedEvent) {
@@ -55,6 +56,7 @@ object MyMessageChangedListener extends MessageChangedListener {
}
*/
+/*
object MyFolderListener extends FolderListener {
def folderCreated(e: FolderEvent) {
println("folder created")
@@ -76,6 +78,7 @@ object MyStoreListener extends StoreListener {
println("message: " + e.getMessage())
}
}
+*/
class Dao(c: Connection) extends Closeable {
def close() {
@@ -118,7 +121,78 @@ class Dao(c: Connection) extends Closeable {
}
}
-object Service {
+class DbThread(factory: () => Service) extends Thread with Closeable {
+ private val queue = new LinkedBlockingQueue[(IMAPFolder, Message)]()
+
+ private var shouldRun = true
+
+ def add(folder: IMAPFolder, message: Message) {
+ println("Adding message: " + message)
+ queue.add((folder, message))
+ /*
+ queue.synchronized {
+ queue.add((folder, message))
+ queue.notify()
+ }
+ */
+ }
+
+ def close() {
+ /*
+ queue.synchronized {
+ shouldRun = false
+ queue.notify()
+ this.interrupt()
+ }
+ */
+ this.interrupt()
+ }
+
+ def processQueue(service: Service) {
+ var item = queue.poll()
+ while(item != null) {
+ var (folder, message) = item
+ service.processMessage(folder, message)
+ item = queue.poll()
+ }
+ }
+
+ override def run() {
+ var service = factory()
+ while(shouldRun) {
+ if(service == null) {
+ service = factory()
+ }
+ try {
+ /*
+ queue.synchronized {
+ queue.wait
+ }
+ */
+ Option(queue.poll(1, TimeUnit.DAYS)) match {
+ case None =>
+ case Some((folder, message)) =>
+ println("Processing message: " + message)
+ service.processMessage(folder, message)
+ }
+ }
+ catch {
+ case e =>
+ IOUtils.closeQuietly(service)
+ service = null
+ if(shouldRun)
+ e.printStackTrace();
+ }
+ }
+ println("db thread stopping")
+ }
+}
+
+class Service(dao: Dao) extends Closeable {
+ def close() {
+ IOUtils.closeQuietly(dao)
+ }
+
def headers(part: Part): (Array[String], Array[String]) = {
var headers = part.getAllHeaders.asInstanceOf[java.util.Enumeration[Header]].asScala.toList
var (names, values) = headers.unzip({
@@ -127,7 +201,24 @@ object Service {
})
return (names.toArray, values.toArray)
}
- def processMail(dao: Dao, uid: Long, message: Message) {
+
+ def processMessage(folder: IMAPFolder, message: Message) {
+ val uid = folder.getUID(message)
+ var ok = tx { dao =>
+ println(new Date)
+ if(dao.countMail(uid) > 0) {
+ println("Message already imported: uid=" + uid)
+ }
+ else {
+ processMail(uid, message)
+ }
+ }
+ if(ok.isDefined && !message.getFlags().contains(Flags.Flag.DELETED)) {
+ message.setFlag(Flags.Flag.DELETED, true)
+ }
+ }
+
+ def processMail(uid: Long, message: Message) {
println("Importing message, uid=" + uid)
var (names, values) = headers(message)
@@ -183,9 +274,7 @@ object Service {
}
}
}
-}
-class ConnectionHandler(f: IMAPFolder, dao: Dao) {
def tx[T](f: Dao => T): Option[T] = {
dao.begin()
try {
@@ -199,45 +288,31 @@ class ConnectionHandler(f: IMAPFolder, dao: Dao) {
None
}
}
+}
+class ConnectionHandler(f: IMAPFolder, thread: DbThread) {
def process() {
- val deque = new LinkedBlockingDeque[Long]()
-
println("Folder: " + f.getFullName())
f.open(Folder.READ_WRITE)
println("Folder has " + f.getMessageCount() + " messages, " + f.getNewMessageCount + " new and " + f.getDeletedMessageCount + " deleted.")
// f.addMessageChangedListener(MyMessageChangedListener)
- f.addMessageCountListener(new MyMessageCountListener(f, deque))
+ f.addMessageCountListener(new MessageCountListener {
+ def messagesAdded(e: MessageCountEvent) {
+ e.getMessages.foreach { message =>
+ thread.add(f, message)
+ }
+ }
- f.getMessages().foreach(processMessage)
+ def messagesRemoved(e: MessageCountEvent) {}
+ })
+
+ f.getMessages().foreach(m => thread.add(f, m))
do {
- println("IDLE")
+ println("Going idle...")
f.idle(true);
- println("Waiting for message..")
- /*
- val message = deque.poll
- if(message != null)
- processMessage(message)
- */
} while(true)
}
-
- def processMessage(message: Message) {
- val uid = f.getUID(message)
- var ok = tx { dao =>
- println(new Date)
- if(dao.countMail(uid) > 0) {
- println("Message already imported: uid=" + uid)
- }
- else {
- Service.processMail(dao, uid, message)
- }
- }
- if(ok.isDefined) {
- message.setFlag(Flags.Flag.DELETED, true)
- }
- }
}
class ImapToDb(f: File) {
@@ -304,8 +379,8 @@ class ImapToDb(f: File) {
def connectToFolder() = {
val store = session.getStore("imaps")
store.connect(imapHost, imapUsername, imapPassword)
- store.addConnectionListener(MyConnectionListener)
- store.addStoreListener(MyStoreListener)
+ // store.addConnectionListener(MyConnectionListener)
+ // store.addStoreListener(MyStoreListener)
// store.addFolderListener(MyFolderListener)
val folder = store.getFolder("INBOX")
// folder.addConnectionListener(MyConnectionListener)
@@ -319,30 +394,30 @@ class ImapToDb(f: File) {
}
def connectToDb() = {
+ println("Connecting to db...")
val c = DriverManager.getConnection(dbUrl, dbProps)
c.setAutoCommit(false)
- new Dao(c)
+ new Service(new Dao(c))
}
+ val thread = new DbThread(connectToDb)
+ thread.setDaemon(true)
+ thread.start()
while(true) {
var folder: IMAPFolder = null
- var dao: Dao = null
try {
folder = connectToFolder()
- dao = connectToDb()
- new ConnectionHandler(folder, dao).process()
+ new ConnectionHandler(folder, thread).process()
} catch {
case e =>
System.out.println("Uncaught exception: " + e.getMessage)
} finally {
- try { folder.getStore.close() } catch { case _ => }
- IOUtils.closeQuietly(dao)
+ IOUtils.closeQuietly(new Closeable { def close { folder.getStore.close() } } )
Thread.sleep(1000)
}
}
}
}
-
}
object ImapToDb extends App {