Spark与Elasticsearch的整合

作者: JerryHouse 分类: elasticsearch, 数据分析 发布时间: 2017-10-09 12:11 ė 6Spark与Elasticsearch的整合已关闭评论

Spark为大数据处理和大规模机器学习提供了很好的计算框架和丰富的编程接口,elasticsearch-spark是官方提供的elasticsearch插件,支持从elasticsearch中读取海量数据导入到spark和通过spark将海量数据导入到elasticsearch。

1.导入

<pre>object ReloadTask {
  val logger: Logger = LoggerFactory.getLogger("ReloadTask")

  def main(args: Array[String]) {
    val optionsList = args.map { arg =>
      arg.dropWhile(_ == '-').split('=') match {
        case Array(opt, v) => (opt -> v)
        case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
      }
    }
    val options = Map(optionsList: _*)

    if(!(options contains "index") || !(options contains "type") || !(options contains "nodes")
      || !(options contains "sql") || !(options contains "date")) {
      println("The nodes, type, index, sql, date parameters should be provided")
      return
    }

    val nodes = options("nodes")
    val indexName = options("index")
    val indexType = options("type")
    val sqlFile = options("sql")
    val date = options("date")

    val conf = new SparkConf().setAppName("dmp_spark")
    conf.set("es.nodes", nodes)
    conf.set("es.index.auto.create", "false")
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)


    val joinSql: String = Source.fromFile(sqlFile).mkString
    val df1 = sqlContext.sql(joinSql.format(date, date, date, date, date))
    val columns1 = df1.columns
    val rdd1 = df1.map(r => handle(columns1, r))
//    rdd1.take(10000).foreach(println)

    EsSpark.saveToEs(rdd1, indexName + "/" + indexType)
  }

  def handle(columns:Array[String], row: Row): Map[String, Any] = {
    val m1 = handleBasic(columns, row)
    val m2 = handleOrder(columns, row)
    val m3 = handleCart(columns, row)
    val m4 = handleBehaviour(columns, row)
    val m5 = handleCoupon(columns, row)
    m1 ++ m2 ++ m3 ++ m4 ++ m5
  }

  def handleBasic(columns:Array[String], row: Row): Map[String, Any] = {
    val userId = rawIntegerTag(row.getString(columns.indexOf("userid1")))
    val sCity = row.getString(columns.indexOf("scity"))
    val mobile = row.getString(columns.indexOf("mobile"))
    val hasMobile = if(mobile == "" || mobile == null) 0 else 1
    val gender = genderTag(row.getString(columns.indexOf("gender")))
    val os = osTag(row.getString(columns.indexOf("lastterminal_os")))
    val email = row.getString(columns.indexOf("email"))
    val province = row.getString(columns.indexOf("sprovince"))
    val cookieid = row.getString(columns.indexOf("cookieid1"))
    val activateTime = timeTag(row.getString(columns.indexOf("cookie_activatetime")))
    val regTime = timeTag(row.getString(columns.indexOf("regtime")))
    val regChannel = row.getString(columns.indexOf("regchannel"))
    val regSource = row.getString(columns.indexOf("regsource"))
    val regOs = osTag(row.getString(columns.indexOf("regterminal_os")))
    val lastVisitTime = timeTag(row.getString(columns.indexOf("lastvisittime")))
    val pushOpenTime = timeTag(row.getString(columns.indexOf("lastpushopentime")))
    val noticeEnable = rawIntegerTag(row.getString(columns.indexOf("lastnoticeenable")))
    val userLevel = rawIntegerTag(row.getString(columns.indexOf("userlevel")))
    Map("user_id" -> userId, "city" -> sCity, "mobile" -> mobile, "has_mobile" -> hasMobile, "gender" -> gender, "os" -> os, "email" -> email,
      "province" -> province, "cookie_id" -> cookieid, "activate_time" -> activateTime, "register_time" -> regTime,
      "register_channel" -> regChannel, "register_platform" -> regSource, "register_os" -> regOs,
      "user_level" -> userLevel,
      "last_visit_time" -> lastVisitTime, "is_push_on" -> noticeEnable, "push_open_time" -> pushOpenTime
    )
  }</pre>

2.导出

<pre>object SynProductIndexTask {
  def main(args: Array[String]) {
    val optionsList = args.map { arg =>
      arg.dropWhile(_ == '-').split('=') match {
        case Array(opt, v) => (opt -> v)
        case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
      }
    }
    val options = Map(optionsList: _*)

    if(!(options contains "nodes") || !(options contains "date")) {
      println("The nodes parameters should be provided")
      return
    }

    val nodes = options("nodes")
    val date = options("date")
    val conf = new SparkConf().setAppName("syn_esproduct_spark")
    conf.set("es.nodes", nodes)
    conf.set("es.index.auto.create", "false")
    conf.set("es.mapping.date.rich", "false")
    conf.set("spark.speculation", "false")

//    sc.stop()
    val sc = new SparkContext(conf)
    val rdd1 = sc.esRDD("cmbjuyanghuo/item")

//    rdd1.map(line => line._2).first()
//    rdd1.first()._2
//    rdd1.take(100).map(line => line._2).foreach(println)
//    rdd1.saveAsTextFile("/user/wangqiang/productindex/2")

    val rdd2 = rdd1.map(line => {
      val map = line._2
      val sproduct = if(map contains "sproduct") map("sproduct") else ""
      val description = if(map contains "description") map("description") else ""
      val country = if(map contains "sCountryNameZh") map("sCountryNameZh") else ""
      val sellerLevel = if(map contains "sellerLevel") map("sellerLevel") else ""
      val orderuser = if(map contains "orderuser") map("orderuser") else 0f
      val listucrate = if(map contains "listucrate") map("listucrate") else 0f
      val listuc = if(map contains "listuc") map("listuc") else 0f
      val detailucrate = if(map contains "detailucrate") map("detailucrate") else 0f
      val detailuc = if(map contains "detailuc") map("detailuc") else 0f
      val favuser = if(map contains "favuser") map("favuser") else 0f
      val rankfeaturesgmv = if(map contains "rankfeaturesgmv") map("rankfeaturesgmv") else 0f
      val ordernum = if(map contains "ordernum") map("ordernum") else 0f
      val comuser = if(map contains "comuser") map("comuser") else 0f
      val items = if(map contains "items") map("items") else null
      val clusterids = if(map contains "clusterids") map("clusterids") else null
      val validstart = if(map contains "validstart") map("validstart") else ""
      val validend = if(map contains "validend") map("validend") else ""
      val livestarttime = if(map contains "livestarttime") map("livestarttime") else ""
      val liveendtime = if(map contains "liveendtime") map("liveendtime") else ""
      val activitystarttime = if(map contains "activitystarttime") map("activitystarttime") else ""
      val activityendtime = if(map contains "activityendtime") map("activityendtime") else ""
      val hiddenbegintime = if(map contains "hiddenbegintime") map("hiddenbegintime") else ""
      val hiddenendtime = if(map contains "hiddenendtime") map("hiddenendtime") else ""
      val punishbegintime = if(map contains "punishbegintime") map("punishbegintime") else ""
      val punishendtime = if(map contains "punishendtime") map("punishendtime") else ""
      val sproductid = if(map contains "sproductid") map("sproductid") else ""
      val productpunishtype = if(map contains "productpunishtype") map("productpunishtype") else null
      val stock = if(map contains "stock") map("stock") else 0
      val gmv = if(map contains "gmv") map("gmv") else 0f
      val sellerpunishtype = if(map contains "sellerpunishtype") map("sellerpunishtype") else null
      val isselleractive = if(map contains "isselleractive") map("isselleractive") else 0
      val dsr = if(map contains "dsr") map("dsr") else 0f
      val iswalled = if(map contains "iswalled") map("iswalled") else 0
      val ilivestatus = if(map contains "ilivestatus") map("ilivestatus") else 0
      val rankscore = if(map contains "rankscore") map("rankscore") else 0f
      val orderpoint = if(map contains "orderpoint") map("orderpoint") else 0
      val iliveid = if(map contains "iliveid") map("iliveid") else 0
      Row(
        map("id"), sproductid, map("ibrandid"),
        map("ifirstcategoryid"), map("isecondcategoryid"), map("ithirdcategoryid"),
        map("sellercountryid"), map("fquoteprice"), map("priceid"),
        map("iuserid"), orderpoint, iliveid,
        items, clusterids, orderuser, listucrate,
        listuc, detailucrate,
        detailuc, favuser, rankfeaturesgmv, ordernum,
        comuser, rankscore,
        validstart, validend,
        livestarttime, liveendtime, ilivestatus,
        activitystarttime, activityendtime,
        hiddenbegintime, hiddenendtime,
        punishbegintime, punishendtime,
        isselleractive, stock, productpunishtype, iswalled,
        dsr, gmv, sellerpunishtype,
        sellerLevel, sproduct, description, country
      )
    })

    val schema = StructType(Array(
      StructField("id", IntegerType, nullable = true),
      StructField("sproductid", StringType, nullable = true),
      StructField("ibrandid", IntegerType, nullable = true),
      StructField("ifirstcategoryid", IntegerType, nullable = true),
      StructField("isecondcategoryid", IntegerType, nullable = true),
      StructField("ithirdcategoryid", IntegerType, nullable = true),
      StructField("sellercountryid", IntegerType, nullable = true),
      StructField("fquoteprice", FloatType, nullable = true),
      StructField("priceid", IntegerType, nullable = true),
      StructField("iuserid", IntegerType, nullable = true),
      StructField("orderpoint", FloatType, nullable = true),
      StructField("iliveid", IntegerType, nullable = true),
      StructField("items", ArrayType(StringType, false), nullable = true),
      StructField("clusterids", ArrayType(StringType, false), nullable = true),
      StructField("orderuser", FloatType, nullable = true),
      StructField("listucrate", FloatType, nullable = true),
      StructField("listuc", FloatType, nullable = true),
      StructField("detailucrate", FloatType, nullable = true),

      StructField("detailuc", FloatType, nullable = true),
      StructField("favuser", FloatType, nullable = true),
      StructField("rankfeaturesgmv", FloatType, nullable = true),
      StructField("ordernum", FloatType, nullable = true),
      StructField("comuser", FloatType, nullable = true),
      StructField("rankscore", FloatType, nullable = true),

      StructField("validstart", StringType, nullable = true),
      StructField("validend", StringType, nullable = true),
      StructField("livestarttime", StringType, nullable = true),
      StructField("liveendtime", StringType, nullable = true),
      StructField("ilivestatus", IntegerType, nullable = true),
      StructField("activitystarttime", StringType, nullable = true),
      StructField("activityendtime", StringType, nullable = true),
      StructField("hiddenbegintime", StringType, nullable = true),
      StructField("hiddenendtime", StringType, nullable = true),
      StructField("punishbegintime", StringType, nullable = true),
      StructField("punishendtime", StringType, nullable = true),

      StructField("isselleractive", IntegerType, nullable = true),
      StructField("stock", IntegerType, nullable = true),
      StructField("productpunishtype", ArrayType(IntegerType, false), nullable = true),
      StructField("iswalled", IntegerType, nullable = true),
      StructField("dsr", FloatType, nullable = true),
      StructField("gmv", FloatType, nullable = true),
      StructField("sellerpunishtype", ArrayType(IntegerType, false), nullable = true),
      StructField("sellerLevel", StringType, nullable = true),
      StructField("sproduct", StringType, nullable = true),
      StructField("description", StringType, nullable = true),
      StructField("country", StringType, nullable = true)
    ))

    val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
    val pDF = sqlContext.createDataFrame(rdd2, schema)
    pDF.write.mode("overwrite").parquet("/user/wangqiang/es_index/product/" + date)
//    val parquetFileDF = spark.read.parquet("/user/wangqiang/es_index/product/2017071116")
//    parquetFileDF.createOrReplaceTempView("parquetFile")
  }
}</pre>

3.部署

<pre><dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-reflect</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-compiler</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.10</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.24</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-spark-20_2.10</artifactId>
        <version>${es.version}</version>
    </dependency>
</dependencies>
...</pre>
<pre><plugin>
    <groupId>org.scala-tools</groupId>
    <artifactId>maven-scala-plugin</artifactId>
    <version>${scala.maven.version}</version>
    <executions>
        <execution>
            <id>scala-compile-first</id>
            <goals>
                <goal>compile</goal>
            </goals>
            <configuration>
                <includes>
                    <include>**/*.scala</include>
                </includes>
                <excludes>
                    <exclude>*.properties</exclude>
                </excludes>
            </configuration>
        </execution>
        <execution>
            <id>scala-test-compile</id>
            <goals>
                <goal>testCompile</goal>
            </goals>
        </execution>
    </executions>
</plugin></pre>

 

本文出自 dcharm,转载时请注明出处及相应链接。

本文永久链接: http://www.dcharm.com/?p=691

Ɣ回顶部