From d5a8420694bc9f5b269f46097829d3178e116923 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Tue, 14 Aug 2012 01:02:48 +0200 Subject: o Initial import. --- src/main/scala/main.scala | 350 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 350 insertions(+) create mode 100644 src/main/scala/main.scala (limited to 'src/main/scala') diff --git a/src/main/scala/main.scala b/src/main/scala/main.scala new file mode 100644 index 0000000..e8e1d6d --- /dev/null +++ b/src/main/scala/main.scala @@ -0,0 +1,350 @@ +import com.sun.mail.imap._ +import com.sun.mail.util.MailSSLSocketFactory +import java.io._ +import java.sql.{Array => SqlArray, Date => SqlDate, _} +import java.util._ +import java.util.concurrent._ +import javax.mail._ +import javax.mail.event._ +import javax.mail.internet._ +import org.apache.commons.io.IOUtils +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import scala.collection.JavaConverters._ + +object MyConnectionListener extends ConnectionListener { + + def closed(e: ConnectionEvent) { + println("MyConnectionListener: closed"); + } + + def disconnected(e: ConnectionEvent) { + println("MyConnectionListener: disconnected"); + } + + def opened(e: ConnectionEvent) { + println("MyConnectionListener: opened"); + } +} + +class MyMessageCountListener(f: IMAPFolder, deque: Deque[Long]) extends MessageCountListener { + def messagesAdded(e: MessageCountEvent) { + println("messagesAdded: added count=" + e.getMessages.length) + e.getMessages.foreach { message => + val uid = f.getUID(message) + println("uid=" + uid) + deque.push(uid) + } + } + + def messagesRemoved(e: MessageCountEvent) { + println("messagesRemoved: removed count=" + e.getMessages.length + ", isRemoved=" + e.isRemoved) + e.getMessages.foreach { message => + println("message: " + message.getFrom) + } + } +} + +/* +object MyMessageChangedListener extends MessageChangedListener { + def messageChanged(e: MessageChangedEvent) { + val t = if(e.getMessageChangeType() == MessageChangedEvent.FLAGS_CHANGED) "FLAGS_CHANGED" else "ENVELOPE_CHANGED" + println("messageChanged: " + t) + println("message: " + e.getMessage()) + } +} +*/ + +object MyFolderListener extends FolderListener { + def folderCreated(e: FolderEvent) { + println("folder created") + } + + def folderDeleted(e: FolderEvent) { + println("folder deleted") + } + + def folderRenamed(e: FolderEvent) { + println("folder renamed") + } +} + +object MyStoreListener extends StoreListener { + def notification(e: StoreEvent) { + val t = if(e.getMessageType() == StoreEvent.NOTICE) "NOTICE" else "ALERT" + println("store event: " + t) + println("message: " + e.getMessage()) + } +} + +class Dao(c: Connection) extends Closeable { + def close() { + c.close() + } + + val s = c.prepareStatement("BEGIN") + def begin() { s.executeUpdate() } + def commit() { c.commit() } + def rollback() { c.rollback() } + + val countMail = c.prepareStatement("SELECT count(uid) FROM mail WHERE uid=?") + def countMail(uid: Long): Long = { + countMail.setLong(1, uid) + val rs = countMail.executeQuery() + rs.next() + val count = rs.getLong(1) + rs.close() + count + } + + val insertMail = c.prepareStatement( + "INSERT INTO mail(uid, header_keys, header_values) VALUES(?, ?, ?)") + def insertMail(uid: Long, keys: Array[String], values: Array[String]) { + insertMail.setLong(1, uid) + insertMail.setArray(2, c.createArrayOf("varchar", keys.asInstanceOf[Array[Object]])) + insertMail.setArray(3, c.createArrayOf("varchar", values.asInstanceOf[Array[Object]])) + insertMail.executeUpdate() + } + + val insertMailPart = c.prepareStatement( + "INSERT INTO mail_part(mail_uid, index, header_keys, header_values, data) VALUES(?, ?, ?, ?, ?)") + def insertMailPart(uid: Long, index: Int, keys: Array[String], values: Array[String], is: InputStream, size: Int) { + insertMailPart.setLong(1, uid) + insertMailPart.setInt(2, index) + insertMailPart.setArray(3, c.createArrayOf("varchar", keys.asInstanceOf[Array[Object]])) + insertMailPart.setArray(4, c.createArrayOf("varchar", values.asInstanceOf[Array[Object]])) + insertMailPart.setBinaryStream(5, is, size) + insertMailPart.executeUpdate() + } +} + +object Service { + def headers(part: Part): (Array[String], Array[String]) = { + var headers = part.getAllHeaders.asInstanceOf[java.util.Enumeration[Header]].asScala.toList + var (names, values) = headers.unzip({ + header => println(header.getName + ": " + header.getValue) + (header.getName, header.getValue) + }) + return (names.toArray, values.toArray) + } + def processMail(dao: Dao, uid: Long, message: Message) { + println("Importing message, uid=" + uid) + + var (names, values) = headers(message) + dao.insertMail(uid, names, values) + + /* + message.getFrom.foreach { address => + if(address.isInstanceOf[InternetAddress]) { + val a = address.asInstanceOf[InternetAddress] + println("From: " + a.getPersonal + " <" + a.getAddress + ">") + } + else + println("From: " + address) + } + message.getAllRecipients.foreach { address => + println("To: " + address) + } + println("Subject: " + message.getSubject) + */ + + if(!message.isInstanceOf[MimeMessage]) + error("Unprocessable: not a mime message") + else { + val m = message.asInstanceOf[MimeMessage] + /* + println("Mime mail") + println("Content-Type: " + m.getContentType) + */ + val content = m.getContent +// println("Is multipart? " + m.getContent.isInstanceOf[Multipart]) + if(m.getContent.isInstanceOf[Multipart]) { + val multipart = m.getContent.asInstanceOf[Multipart] + 0.until(multipart.getCount).foreach { i => + val body = multipart.getBodyPart(i) + var (names, values) = headers(body) + /* + println("Part #" + i) + println("Content-Type: " + body.getContentType) + println("Description: " + body.getDescription) + println("Content-Disposition: " + body.getDisposition) + println("FileName: " + body.getFileName) + println("Downloading part #" + i) + */ + val start = System.currentTimeMillis + var bytes = IOUtils.toByteArray(m.getInputStream) + val end = System.currentTimeMillis + println("Downloaded " + bytes.length + " bytes in " + (end - start) + "ms") + dao.insertMailPart(uid, i, names, values, new ByteArrayInputStream(bytes), bytes.length) + } + } + else { + // TODO: what to do? + } + } + } +} + +class ConnectionHandler(f: IMAPFolder, dao: Dao) { + def tx[T](f: Dao => T): Option[T] = { + dao.begin() + try { + val t = f(dao) + dao.commit() + Some(t) + } catch { + case e => + e.printStackTrace + dao.rollback() + None + } + } + + def process() { + val deque = new LinkedBlockingDeque[Long]() + + println("Folder: " + f.getFullName()) + f.open(Folder.READ_WRITE) + println("Folder has " + f.getMessageCount() + " messages, " + f.getNewMessageCount + " new and " + f.getDeletedMessageCount + " deleted.") + // f.addMessageChangedListener(MyMessageChangedListener) + f.addMessageCountListener(new MyMessageCountListener(f, deque)) + + f.getMessages().foreach(processMessage) + + do { + println("IDLE") + f.idle(true); + println("Waiting for message..") + /* + val message = deque.poll + if(message != null) + processMessage(message) + */ + } while(true) + } + + def processMessage(message: Message) { + val uid = f.getUID(message) + var ok = tx { dao => + println(new Date) + if(dao.countMail(uid) > 0) { + println("Message already imported: uid=" + uid) + } + else { + Service.processMail(dao, uid, message) + } + } + if(ok.isDefined) { + message.setFlag(Flags.Flag.DELETED, true) + } + } +} + +class ImapToDb(f: File) { + def work() { + val p = new Properties() + var r: InputStream = null + try { + r = new FileInputStream(f) + p.load(r) + } catch { + case _ => + System.err.println("Could not read configuraion file: " + f) + return + } finally { + IOUtils.closeQuietly(r) + } + + def remove(key: String) = { + val value = p.getProperty(key) + println(key + ":" + value) + if(value != null) { + p.remove(key) + Some(value) + } + else + None + } + val params = for { + dbUrl: String <- remove("db.url") + dbUsername: String <- remove("db.username") + dbPassword: String <- remove("db.password") + imapHost: String <- remove("imap.host") + imapUsername: String <- remove("imap.username") + imapPassword: String <- remove("imap.password") + } yield (dbUrl, dbUsername, dbPassword, imapHost, imapUsername, imapPassword) + + params match { + case None => + System.err.println("Required properties:") + System.err.println(" db.url, db.username db.password") + System.err.println(" imap.host, imap.username imap.password") + case Some((dbUrl, dbUsername, dbPassword, imapHost, imapUsername, imapPassword)) => + for(klass: String <- remove("db.driver") if klass.length > 0) + Class.forName(klass) + + val dbProps = new Properties() + println("Db properties: ") + p.asScala.foreach { case (key, value) if key.startsWith("db.") => + dbProps.put(key.substring(3), value) + println(key + ": " + value) + } + + val imapProps = new Properties() + println("Imap properties: ") + p.asScala.foreach { case (key, value) if key.startsWith("imap.") => + imapProps.put(key.substring(5), value) + println(key + ": " + value) + } + + val sf = new MailSSLSocketFactory() + sf.setTrustAllHosts(true) + imapProps.put("mail.imaps.ssl.socketFactory", sf) + val session = Session.getDefaultInstance(imapProps, null) + def connectToFolder() = { + val store = session.getStore("imaps") + store.connect(imapHost, imapUsername, imapPassword) + store.addConnectionListener(MyConnectionListener) + store.addStoreListener(MyStoreListener) + // store.addFolderListener(MyFolderListener) + val folder = store.getFolder("INBOX") + // folder.addConnectionListener(MyConnectionListener) + // folder.addFolderListener(MyFolderListener) + // folder.addMessageCountListener(MyMessageCountListener) + // folder.addMessageChangedListener(MyMessageChangedListener) + if(!folder.isInstanceOf[IMAPFolder]) { + error("folder is not an IMAPFolder") + } + folder.asInstanceOf[IMAPFolder] + } + + def connectToDb() = { + val c = DriverManager.getConnection(dbUrl, dbProps) + c.setAutoCommit(false) + new Dao(c) + } + + while(true) { + var folder: IMAPFolder = null + var dao: Dao = null + try { + folder = connectToFolder() + dao = connectToDb() + new ConnectionHandler(folder, dao).process() + } catch { + case e => + System.out.println("Uncaught exception: " + e.getMessage) + } finally { + try { folder.getStore.close() } catch { case _ => } + IOUtils.closeQuietly(dao) + Thread.sleep(1000) + } + } + } + } + +} + +object ImapToDb extends App { + new ImapToDb(new File("imap-to-db.properties")).work() +} -- cgit v1.2.3