summaryrefslogtreecommitdiff
path: root/src/main/scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala')
-rw-r--r--src/main/scala/main.scala350
1 files changed, 350 insertions, 0 deletions
diff --git a/src/main/scala/main.scala b/src/main/scala/main.scala
new file mode 100644
index 0000000..e8e1d6d
--- /dev/null
+++ b/src/main/scala/main.scala
@@ -0,0 +1,350 @@
+import com.sun.mail.imap._
+import com.sun.mail.util.MailSSLSocketFactory
+import java.io._
+import java.sql.{Array => SqlArray, Date => SqlDate, _}
+import java.util._
+import java.util.concurrent._
+import javax.mail._
+import javax.mail.event._
+import javax.mail.internet._
+import org.apache.commons.io.IOUtils
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+import scala.collection.JavaConverters._
+
+object MyConnectionListener extends ConnectionListener {
+
+ def closed(e: ConnectionEvent) {
+ println("MyConnectionListener: closed");
+ }
+
+ def disconnected(e: ConnectionEvent) {
+ println("MyConnectionListener: disconnected");
+ }
+
+ def opened(e: ConnectionEvent) {
+ println("MyConnectionListener: opened");
+ }
+}
+
+class MyMessageCountListener(f: IMAPFolder, deque: Deque[Long]) extends MessageCountListener {
+ def messagesAdded(e: MessageCountEvent) {
+ println("messagesAdded: added count=" + e.getMessages.length)
+ e.getMessages.foreach { message =>
+ val uid = f.getUID(message)
+ println("uid=" + uid)
+ deque.push(uid)
+ }
+ }
+
+ def messagesRemoved(e: MessageCountEvent) {
+ println("messagesRemoved: removed count=" + e.getMessages.length + ", isRemoved=" + e.isRemoved)
+ e.getMessages.foreach { message =>
+ println("message: " + message.getFrom)
+ }
+ }
+}
+
+/*
+object MyMessageChangedListener extends MessageChangedListener {
+ def messageChanged(e: MessageChangedEvent) {
+ val t = if(e.getMessageChangeType() == MessageChangedEvent.FLAGS_CHANGED) "FLAGS_CHANGED" else "ENVELOPE_CHANGED"
+ println("messageChanged: " + t)
+ println("message: " + e.getMessage())
+ }
+}
+*/
+
+object MyFolderListener extends FolderListener {
+ def folderCreated(e: FolderEvent) {
+ println("folder created")
+ }
+
+ def folderDeleted(e: FolderEvent) {
+ println("folder deleted")
+ }
+
+ def folderRenamed(e: FolderEvent) {
+ println("folder renamed")
+ }
+}
+
+object MyStoreListener extends StoreListener {
+ def notification(e: StoreEvent) {
+ val t = if(e.getMessageType() == StoreEvent.NOTICE) "NOTICE" else "ALERT"
+ println("store event: " + t)
+ println("message: " + e.getMessage())
+ }
+}
+
+class Dao(c: Connection) extends Closeable {
+ def close() {
+ c.close()
+ }
+
+ val s = c.prepareStatement("BEGIN")
+ def begin() { s.executeUpdate() }
+ def commit() { c.commit() }
+ def rollback() { c.rollback() }
+
+ val countMail = c.prepareStatement("SELECT count(uid) FROM mail WHERE uid=?")
+ def countMail(uid: Long): Long = {
+ countMail.setLong(1, uid)
+ val rs = countMail.executeQuery()
+ rs.next()
+ val count = rs.getLong(1)
+ rs.close()
+ count
+ }
+
+ val insertMail = c.prepareStatement(
+ "INSERT INTO mail(uid, header_keys, header_values) VALUES(?, ?, ?)")
+ def insertMail(uid: Long, keys: Array[String], values: Array[String]) {
+ insertMail.setLong(1, uid)
+ insertMail.setArray(2, c.createArrayOf("varchar", keys.asInstanceOf[Array[Object]]))
+ insertMail.setArray(3, c.createArrayOf("varchar", values.asInstanceOf[Array[Object]]))
+ insertMail.executeUpdate()
+ }
+
+ val insertMailPart = c.prepareStatement(
+ "INSERT INTO mail_part(mail_uid, index, header_keys, header_values, data) VALUES(?, ?, ?, ?, ?)")
+ def insertMailPart(uid: Long, index: Int, keys: Array[String], values: Array[String], is: InputStream, size: Int) {
+ insertMailPart.setLong(1, uid)
+ insertMailPart.setInt(2, index)
+ insertMailPart.setArray(3, c.createArrayOf("varchar", keys.asInstanceOf[Array[Object]]))
+ insertMailPart.setArray(4, c.createArrayOf("varchar", values.asInstanceOf[Array[Object]]))
+ insertMailPart.setBinaryStream(5, is, size)
+ insertMailPart.executeUpdate()
+ }
+}
+
+object Service {
+ def headers(part: Part): (Array[String], Array[String]) = {
+ var headers = part.getAllHeaders.asInstanceOf[java.util.Enumeration[Header]].asScala.toList
+ var (names, values) = headers.unzip({
+ header => println(header.getName + ": " + header.getValue)
+ (header.getName, header.getValue)
+ })
+ return (names.toArray, values.toArray)
+ }
+ def processMail(dao: Dao, uid: Long, message: Message) {
+ println("Importing message, uid=" + uid)
+
+ var (names, values) = headers(message)
+ dao.insertMail(uid, names, values)
+
+ /*
+ message.getFrom.foreach { address =>
+ if(address.isInstanceOf[InternetAddress]) {
+ val a = address.asInstanceOf[InternetAddress]
+ println("From: " + a.getPersonal + " <" + a.getAddress + ">")
+ }
+ else
+ println("From: " + address)
+ }
+ message.getAllRecipients.foreach { address =>
+ println("To: " + address)
+ }
+ println("Subject: " + message.getSubject)
+ */
+
+ if(!message.isInstanceOf[MimeMessage])
+ error("Unprocessable: not a mime message")
+ else {
+ val m = message.asInstanceOf[MimeMessage]
+ /*
+ println("Mime mail")
+ println("Content-Type: " + m.getContentType)
+ */
+ val content = m.getContent
+// println("Is multipart? " + m.getContent.isInstanceOf[Multipart])
+ if(m.getContent.isInstanceOf[Multipart]) {
+ val multipart = m.getContent.asInstanceOf[Multipart]
+ 0.until(multipart.getCount).foreach { i =>
+ val body = multipart.getBodyPart(i)
+ var (names, values) = headers(body)
+ /*
+ println("Part #" + i)
+ println("Content-Type: " + body.getContentType)
+ println("Description: " + body.getDescription)
+ println("Content-Disposition: " + body.getDisposition)
+ println("FileName: " + body.getFileName)
+ println("Downloading part #" + i)
+ */
+ val start = System.currentTimeMillis
+ var bytes = IOUtils.toByteArray(m.getInputStream)
+ val end = System.currentTimeMillis
+ println("Downloaded " + bytes.length + " bytes in " + (end - start) + "ms")
+ dao.insertMailPart(uid, i, names, values, new ByteArrayInputStream(bytes), bytes.length)
+ }
+ }
+ else {
+ // TODO: what to do?
+ }
+ }
+ }
+}
+
+class ConnectionHandler(f: IMAPFolder, dao: Dao) {
+ def tx[T](f: Dao => T): Option[T] = {
+ dao.begin()
+ try {
+ val t = f(dao)
+ dao.commit()
+ Some(t)
+ } catch {
+ case e =>
+ e.printStackTrace
+ dao.rollback()
+ None
+ }
+ }
+
+ def process() {
+ val deque = new LinkedBlockingDeque[Long]()
+
+ println("Folder: " + f.getFullName())
+ f.open(Folder.READ_WRITE)
+ println("Folder has " + f.getMessageCount() + " messages, " + f.getNewMessageCount + " new and " + f.getDeletedMessageCount + " deleted.")
+ // f.addMessageChangedListener(MyMessageChangedListener)
+ f.addMessageCountListener(new MyMessageCountListener(f, deque))
+
+ f.getMessages().foreach(processMessage)
+
+ do {
+ println("IDLE")
+ f.idle(true);
+ println("Waiting for message..")
+ /*
+ val message = deque.poll
+ if(message != null)
+ processMessage(message)
+ */
+ } while(true)
+ }
+
+ def processMessage(message: Message) {
+ val uid = f.getUID(message)
+ var ok = tx { dao =>
+ println(new Date)
+ if(dao.countMail(uid) > 0) {
+ println("Message already imported: uid=" + uid)
+ }
+ else {
+ Service.processMail(dao, uid, message)
+ }
+ }
+ if(ok.isDefined) {
+ message.setFlag(Flags.Flag.DELETED, true)
+ }
+ }
+}
+
+class ImapToDb(f: File) {
+ def work() {
+ val p = new Properties()
+ var r: InputStream = null
+ try {
+ r = new FileInputStream(f)
+ p.load(r)
+ } catch {
+ case _ =>
+ System.err.println("Could not read configuraion file: " + f)
+ return
+ } finally {
+ IOUtils.closeQuietly(r)
+ }
+
+ def remove(key: String) = {
+ val value = p.getProperty(key)
+ println(key + ":" + value)
+ if(value != null) {
+ p.remove(key)
+ Some(value)
+ }
+ else
+ None
+ }
+ val params = for {
+ dbUrl: String <- remove("db.url")
+ dbUsername: String <- remove("db.username")
+ dbPassword: String <- remove("db.password")
+ imapHost: String <- remove("imap.host")
+ imapUsername: String <- remove("imap.username")
+ imapPassword: String <- remove("imap.password")
+ } yield (dbUrl, dbUsername, dbPassword, imapHost, imapUsername, imapPassword)
+
+ params match {
+ case None =>
+ System.err.println("Required properties:")
+ System.err.println(" db.url, db.username db.password")
+ System.err.println(" imap.host, imap.username imap.password")
+ case Some((dbUrl, dbUsername, dbPassword, imapHost, imapUsername, imapPassword)) =>
+ for(klass: String <- remove("db.driver") if klass.length > 0)
+ Class.forName(klass)
+
+ val dbProps = new Properties()
+ println("Db properties: ")
+ p.asScala.foreach { case (key, value) if key.startsWith("db.") =>
+ dbProps.put(key.substring(3), value)
+ println(key + ": " + value)
+ }
+
+ val imapProps = new Properties()
+ println("Imap properties: ")
+ p.asScala.foreach { case (key, value) if key.startsWith("imap.") =>
+ imapProps.put(key.substring(5), value)
+ println(key + ": " + value)
+ }
+
+ val sf = new MailSSLSocketFactory()
+ sf.setTrustAllHosts(true)
+ imapProps.put("mail.imaps.ssl.socketFactory", sf)
+ val session = Session.getDefaultInstance(imapProps, null)
+ def connectToFolder() = {
+ val store = session.getStore("imaps")
+ store.connect(imapHost, imapUsername, imapPassword)
+ store.addConnectionListener(MyConnectionListener)
+ store.addStoreListener(MyStoreListener)
+ // store.addFolderListener(MyFolderListener)
+ val folder = store.getFolder("INBOX")
+ // folder.addConnectionListener(MyConnectionListener)
+ // folder.addFolderListener(MyFolderListener)
+ // folder.addMessageCountListener(MyMessageCountListener)
+ // folder.addMessageChangedListener(MyMessageChangedListener)
+ if(!folder.isInstanceOf[IMAPFolder]) {
+ error("folder is not an IMAPFolder")
+ }
+ folder.asInstanceOf[IMAPFolder]
+ }
+
+ def connectToDb() = {
+ val c = DriverManager.getConnection(dbUrl, dbProps)
+ c.setAutoCommit(false)
+ new Dao(c)
+ }
+
+ while(true) {
+ var folder: IMAPFolder = null
+ var dao: Dao = null
+ try {
+ folder = connectToFolder()
+ dao = connectToDb()
+ new ConnectionHandler(folder, dao).process()
+ } catch {
+ case e =>
+ System.out.println("Uncaught exception: " + e.getMessage)
+ } finally {
+ try { folder.getStore.close() } catch { case _ => }
+ IOUtils.closeQuietly(dao)
+ Thread.sleep(1000)
+ }
+ }
+ }
+ }
+
+}
+
+object ImapToDb extends App {
+ new ImapToDb(new File("imap-to-db.properties")).work()
+}