import com.sun.mail.imap._ import com.sun.mail.util.MailSSLSocketFactory import java.io._ import java.sql.{Array => SqlArray, Date => SqlDate, _} import java.util._ import java.util.concurrent._ import javax.mail._ import javax.mail.event._ import javax.mail.internet._ import org.apache.commons.io.IOUtils import org.slf4j.Logger import org.slf4j.LoggerFactory import scala.collection.JavaConverters._ object MyConnectionListener extends ConnectionListener { def closed(e: ConnectionEvent) { println("MyConnectionListener: closed"); } def disconnected(e: ConnectionEvent) { println("MyConnectionListener: disconnected"); } def opened(e: ConnectionEvent) { println("MyConnectionListener: opened"); } } class MyMessageCountListener(f: IMAPFolder, deque: Deque[Long]) extends MessageCountListener { def messagesAdded(e: MessageCountEvent) { println("messagesAdded: added count=" + e.getMessages.length) e.getMessages.foreach { message => val uid = f.getUID(message) println("uid=" + uid) deque.push(uid) } } def messagesRemoved(e: MessageCountEvent) { println("messagesRemoved: removed count=" + e.getMessages.length + ", isRemoved=" + e.isRemoved) e.getMessages.foreach { message => println("message: " + message.getFrom) } } } /* object MyMessageChangedListener extends MessageChangedListener { def messageChanged(e: MessageChangedEvent) { val t = if(e.getMessageChangeType() == MessageChangedEvent.FLAGS_CHANGED) "FLAGS_CHANGED" else "ENVELOPE_CHANGED" println("messageChanged: " + t) println("message: " + e.getMessage()) } } */ object MyFolderListener extends FolderListener { def folderCreated(e: FolderEvent) { println("folder created") } def folderDeleted(e: FolderEvent) { println("folder deleted") } def folderRenamed(e: FolderEvent) { println("folder renamed") } } object MyStoreListener extends StoreListener { def notification(e: StoreEvent) { val t = if(e.getMessageType() == StoreEvent.NOTICE) "NOTICE" else "ALERT" println("store event: " + t) println("message: " + e.getMessage()) } } class Dao(c: Connection) extends Closeable { def close() { c.close() } val s = c.prepareStatement("BEGIN") def begin() { s.executeUpdate() } def commit() { c.commit() } def rollback() { c.rollback() } val countMail = c.prepareStatement("SELECT count(uid) FROM mail WHERE uid=?") def countMail(uid: Long): Long = { countMail.setLong(1, uid) val rs = countMail.executeQuery() rs.next() val count = rs.getLong(1) rs.close() count } val insertMail = c.prepareStatement( "INSERT INTO mail(uid, header_keys, header_values) VALUES(?, ?, ?)") def insertMail(uid: Long, keys: Array[String], values: Array[String]) { insertMail.setLong(1, uid) insertMail.setArray(2, c.createArrayOf("varchar", keys.asInstanceOf[Array[Object]])) insertMail.setArray(3, c.createArrayOf("varchar", values.asInstanceOf[Array[Object]])) insertMail.executeUpdate() } val insertMailPart = c.prepareStatement( "INSERT INTO mail_part(mail_uid, index, header_keys, header_values, data) VALUES(?, ?, ?, ?, ?)") def insertMailPart(uid: Long, index: Int, keys: Array[String], values: Array[String], is: InputStream, size: Int) { insertMailPart.setLong(1, uid) insertMailPart.setInt(2, index) insertMailPart.setArray(3, c.createArrayOf("varchar", keys.asInstanceOf[Array[Object]])) insertMailPart.setArray(4, c.createArrayOf("varchar", values.asInstanceOf[Array[Object]])) insertMailPart.setBinaryStream(5, is, size) insertMailPart.executeUpdate() } } object Service { def headers(part: Part): (Array[String], Array[String]) = { var headers = part.getAllHeaders.asInstanceOf[java.util.Enumeration[Header]].asScala.toList var (names, values) = headers.unzip({ header => println(header.getName + ": " + header.getValue) (header.getName, header.getValue) }) return (names.toArray, values.toArray) } def processMail(dao: Dao, uid: Long, message: Message) { println("Importing message, uid=" + uid) var (names, values) = headers(message) dao.insertMail(uid, names, values) /* message.getFrom.foreach { address => if(address.isInstanceOf[InternetAddress]) { val a = address.asInstanceOf[InternetAddress] println("From: " + a.getPersonal + " <" + a.getAddress + ">") } else println("From: " + address) } message.getAllRecipients.foreach { address => println("To: " + address) } println("Subject: " + message.getSubject) */ if(!message.isInstanceOf[MimeMessage]) error("Unprocessable: not a mime message") else { val m = message.asInstanceOf[MimeMessage] /* println("Mime mail") println("Content-Type: " + m.getContentType) */ val content = m.getContent // println("Is multipart? " + m.getContent.isInstanceOf[Multipart]) if(m.getContent.isInstanceOf[Multipart]) { val multipart = m.getContent.asInstanceOf[Multipart] 0.until(multipart.getCount).foreach { i => val body = multipart.getBodyPart(i) var (names, values) = headers(body) /* println("Part #" + i) println("Content-Type: " + body.getContentType) println("Description: " + body.getDescription) println("Content-Disposition: " + body.getDisposition) println("FileName: " + body.getFileName) println("Downloading part #" + i) */ val start = System.currentTimeMillis var bytes = IOUtils.toByteArray(m.getInputStream) val end = System.currentTimeMillis println("Downloaded " + bytes.length + " bytes in " + (end - start) + "ms") dao.insertMailPart(uid, i, names, values, new ByteArrayInputStream(bytes), bytes.length) } } else { // TODO: what to do? } } } } class ConnectionHandler(f: IMAPFolder, dao: Dao) { def tx[T](f: Dao => T): Option[T] = { dao.begin() try { val t = f(dao) dao.commit() Some(t) } catch { case e => e.printStackTrace dao.rollback() None } } def process() { val deque = new LinkedBlockingDeque[Long]() println("Folder: " + f.getFullName()) f.open(Folder.READ_WRITE) println("Folder has " + f.getMessageCount() + " messages, " + f.getNewMessageCount + " new and " + f.getDeletedMessageCount + " deleted.") // f.addMessageChangedListener(MyMessageChangedListener) f.addMessageCountListener(new MyMessageCountListener(f, deque)) f.getMessages().foreach(processMessage) do { println("IDLE") f.idle(true); println("Waiting for message..") /* val message = deque.poll if(message != null) processMessage(message) */ } while(true) } def processMessage(message: Message) { val uid = f.getUID(message) var ok = tx { dao => println(new Date) if(dao.countMail(uid) > 0) { println("Message already imported: uid=" + uid) } else { Service.processMail(dao, uid, message) } } if(ok.isDefined) { message.setFlag(Flags.Flag.DELETED, true) } } } class ImapToDb(f: File) { def work() { val p = new Properties() var r: InputStream = null try { r = new FileInputStream(f) p.load(r) } catch { case _ => System.err.println("Could not read configuraion file: " + f) return } finally { IOUtils.closeQuietly(r) } def remove(key: String) = { val value = p.getProperty(key) println(key + ":" + value) if(value != null) { p.remove(key) Some(value) } else None } val params = for { dbUrl: String <- remove("db.url") dbUsername: String <- remove("db.username") dbPassword: String <- remove("db.password") imapHost: String <- remove("imap.host") imapUsername: String <- remove("imap.username") imapPassword: String <- remove("imap.password") } yield (dbUrl, dbUsername, dbPassword, imapHost, imapUsername, imapPassword) params match { case None => System.err.println("Required properties:") System.err.println(" db.url, db.username db.password") System.err.println(" imap.host, imap.username imap.password") case Some((dbUrl, dbUsername, dbPassword, imapHost, imapUsername, imapPassword)) => for(klass: String <- remove("db.driver") if klass.length > 0) Class.forName(klass) val dbProps = new Properties() println("Db properties: ") p.asScala.foreach { case (key, value) if key.startsWith("db.") => dbProps.put(key.substring(3), value) println(key + ": " + value) } val imapProps = new Properties() println("Imap properties: ") p.asScala.foreach { case (key, value) if key.startsWith("imap.") => imapProps.put(key.substring(5), value) println(key + ": " + value) } val sf = new MailSSLSocketFactory() sf.setTrustAllHosts(true) imapProps.put("mail.imaps.ssl.socketFactory", sf) val session = Session.getDefaultInstance(imapProps, null) def connectToFolder() = { val store = session.getStore("imaps") store.connect(imapHost, imapUsername, imapPassword) store.addConnectionListener(MyConnectionListener) store.addStoreListener(MyStoreListener) // store.addFolderListener(MyFolderListener) val folder = store.getFolder("INBOX") // folder.addConnectionListener(MyConnectionListener) // folder.addFolderListener(MyFolderListener) // folder.addMessageCountListener(MyMessageCountListener) // folder.addMessageChangedListener(MyMessageChangedListener) if(!folder.isInstanceOf[IMAPFolder]) { error("folder is not an IMAPFolder") } folder.asInstanceOf[IMAPFolder] } def connectToDb() = { val c = DriverManager.getConnection(dbUrl, dbProps) c.setAutoCommit(false) new Dao(c) } while(true) { var folder: IMAPFolder = null var dao: Dao = null try { folder = connectToFolder() dao = connectToDb() new ConnectionHandler(folder, dao).process() } catch { case e => System.out.println("Uncaught exception: " + e.getMessage) } finally { try { folder.getStore.close() } catch { case _ => } IOUtils.closeQuietly(dao) Thread.sleep(1000) } } } } } object ImapToDb extends App { new ImapToDb(new File("imap-to-db.properties")).work() }