Repartitioning Change the row order of dataframes in Spark

I want to understand what happens to my data frame after applying the .repartition function. If my original data frame is:

+--------+------+--------------+-------+-----+
|integers|floats|integer_arrays|letters|nulls|
+--------+------+--------------+-------+-----+
|       1|  -1.0|        (1, 2)|      a|    1|
|       2|   0.5|     (3, 4, 5)|      b| null|
|       3|   2.7|  (6, 7, 8, 9)|      c|    2|
+--------+------+--------------+-------+-----+

And I run:

df.repartition(10).show()

The resulting data frame contains the rows in a different order:

+--------+------+--------------+-------+-----+
|integers|floats|integer_arrays|letters|nulls|
+--------+------+--------------+-------+-----+
|       3|   2.7|  (6, 7, 8, 9)|      c|    2|
|       2|   0.5|     (3, 4, 5)|      b| null|
|       1|  -1.0|        (1, 2)|      a|    1|
+--------+------+--------------+-------+-----+

Why does the order of the lines change?

What happens to a data frame with 3 lines, which is divided into 10 partitions?

Can I see the assigned partitions?

Thanks for your help.

dataframe – Dropping a Hbase table into a CSV file using a spark results in data loss

Exporting data from hbase to csv with Phoenix and Spark results in data loss. I have 22 million lines in my hbase table, and when I export them to csv, there are only 19 million lines. 3 million lines are missing.

I tried caching the data frame before writing to csv, but still 19 million lines are the result. I used Coalesce because I need everything in a CSV file.

I also tried with! Exporting record in Phoenix, but the problem here is that the data is huge and charging takes forever.

  1. ! output format csv
  2. ! record data.csv
  3. Select * from the table;
  4. !Recording
  5. !Leaving

If it exists anyway, could I export my hbase table without data loss? Or maybe someone could help me with the processing of my code or any suggestions would be of great help.

My Spark Code in Scala:

import org.apache.log4j.lf5.LogLevel
import org.apache.spark.sql.SparkSession

object ExportCSV {
  def main(args: Array(String)): Unit = {

    val username = "abc"
    val password = "abc"
    val table_name = "xyz"
    val phoenix_zk = "jdbc:phoenix:zkurl"

    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("akka").setLevel(Level.ERROR)

    val spark = SparkSession.builder().master("local(*)")
      .config("spark.debug.maxToStringFields", "100")
      //.config("spark.driver.maxResultSize", "2g")
      .config("spark.yarn.executor.memoryOverhead", "4g")
      .appName("SparkPhoenix")
      .getOrCreate()



    val df = spark.read.format("jdbc").options(
    Map("driver" -> "org.apache.phoenix.jdbc.PhoenixDriver",
    "url" -> phoenix_zk,
    "dbtable" -> xyz)).load()


    print(df.count())  //22 million rows in dataframe
    df.cache()
    print(df.count())  //19 million rows after cache

    df.explain(extended = true)


    df.coalesce(1).write.mode("append").option("header", "true").csv("/tchiring/export_test")```




Is there a way to compare a column value of the next row with the current row in a Dataset object in Spark using Java?

I am trying to convert an existing dataset to the new schema using the scheme below using Java

current scheme

id roll_no student
1 1232 John
2 1456 anna
3 1832 mike
4 2002 Dave

should turn into a new schema based on roll_no and add next_student if roll_no is next to the current student

id roll_no student next_student
1 1232 John Mike
2 1234 anna dave
3 1233 mike anna
4 1235 Dave zero

Is there a way to compare the column value of different rows in the dataset to do this. I would appreciate any hints to solve this problem.

Scala and PySpark display Window Object Lag and Lead methods, but I want to know if there's a way in Java

python – Associates Spark DataFrames on the condition of a next key

What is a high-performance method for creating fuzzy joins in PySpark?

I'm looking for community views on a scalable approach to merging large Spark DataFrames on the condition that the key is closest. Let me illustrate this problem with a representative example. Suppose we have the following Spark DataFrame with events that occur at any given time:

ddf_event = spark.createDataFrame(
    data=(
        (1, 'A'),
        (5, 'A'),
        (10, 'B'),
        (15, 'A'),
        (20, 'B'),
        (25, 'B'),
        (30, 'A')
    ),
    schema=('ts_event', 'event')
)

and the following Spark DataFrame with GPS data measured at a specific time:

ddf_gps = spark.createDataFrame(
    data=(
        (2, '(-46.84635, 173.13674)'),
        (4, '(2.50362, 104.34136)'),
        (8, '(-24.20741, 51.80755)'),
        (15, '(-59.07798, -20.49141)'),
        (18, '(-44.34468, -167.90401)'),
        (24, '(-18.84175, 16.68628)'),
        (27, '(20.48501,58.42423)')
    ),
    schema=('ts_gps', 'gps_coordinates')
)

We would like to join this to create the following resulting DataFrame:

+--------+-----+------+-----------------------+
|ts_event|event|ts_gps|gps_coordinates        |
+--------+-----+------+-----------------------+
|1       |A    |2     |(-46.84635, 173.13674) |
|5       |A    |4     |(2.50362, 104.34136)   |
|10      |B    |8     |(-24.20741, 51.80755)  |
|15      |A    |15    |(-59.07798, -20.49141) |
|20      |B    |18    |(-44.34468, -167.90401)|
|25      |B    |24    |(-18.84175, 16.68628)  |
|30      |A    |27    |(20.48501,58.42423)    |
+--------+-----+------+-----------------------+

effectively find the nearest GPS data point based on the event timestamp and the GPS data timestamp.

We thus encounter the problem of merging on the condition of a next key, in which case "next" is defined as the smallest absolute difference between timestamps.

I investigated two approaches to achieve this: one based on a filtered Binned Join (FBJ) and one based on a filtered sorted union (FSU). Both approaches are described in more detail below.

The FBJ approach depends on the parameter bin_size, which limits the time window in which a matching GPS timestamp can be found. Increase the bin_size Increases the workload and reduces the quality of results.

Both approaches do not seem to scale linearly with the size of the input DataFrames.

In practice, I have to deal with tens of millions of lines of input data, so I'm currently lost to a workable solution to the problem.

FBJ approach

The FBJ approach consists of the following steps:

  1. Create a … ts_bin Column, the binning timestamp Columns, implemented by:
bin_size = 10
ddf_event = ddf_event.withColumn(
    'ts_bin',
    F.round(F.col('ts_event') / bin_size)
)

ddf_gps = ddf_gps.withColumn(
    'ts_bin',
    F.round(F.col('ts_gps') / bin_size)
)
  1. Connect the DataFrames to the ts_bin Column implemented by:
ddf = ddf_event.join(ddf_gps, 'ts_bin', 'left_outer')
  1. Determine the minimum timestamp difference implemented by:
from pyspark.sql.window import Window

window = Window.partitionBy('ts_event')

ddf = ddf.withColumn(
    'ts_diff',
    F.abs(F.col('ts_gps') - F.col('ts_event'))
)

ddf = ddf.withColumn(
    'min_ts_diff',
    F.min(F.col('ts_diff')).over(window)
)
  1. Filter and select the relevant rows and columns, implemented by:
ddf = (
    ddf
    .where(
        (F.col('ts_diff') == F.col('min_ts_diff')) |
        (F.col('ts_diff').isNull())   
    )
    .select(
        'ts_event',
        'event',
        'ts_gps',
        'gps_coordinates'
    )
)

border bin_size situations:

  • bin_size >> 1 effectively leads to a complete cross-connection
  • bin_size = 1 effectively leads to a left join ts_event == ts_gps

FSU approach

The FSU approach consists of the following steps:

  1. Union the DataFrames, implemented by:
def union(df1, df2):
    cols = list(set(df1.columns).union(set(df2.columns)))
    for col in cols:
        if col not in df1.columns:
            df1 = df1.withColumn(col, F.lit(None))
        if col not in df2.columns:
            df2 = df2.withColumn(col, F.lit(None))
    return df1.select(cols).union(df2.select(cols))

ddf_event = ddf_event.withColumn('timestamp', F.col('ts_event'))
ddf_gps = ddf_gps.withColumn('timestamp', F.col('ts_gps'))
ddf = union(ddf_event, ddf_gps)
  1. Sort the resulting DataFrame and get the associated GPS timestamps, implemented by:
from sys import maxsize

last_window = Window.orderBy(
    F.col('timestamp').asc()).rowsBetween(-maxsize, 0)
first_window = Window.orderBy(
    F.col('timestamp').asc()).rowsBetween(0, maxsize)

ddf = (
    ddf.withColumn(
        'prev_time',
        F.last(F.col('ts_gps'), ignorenulls=True)
         .over(last_window)
    ).withColumn(
        'prev_coordinates',
        F.last(F.col('gps_coordinates'), ignorenulls=True)
         .over(last_window)
    ).withColumn(
        'next_time',
        F.first(F.col('ts_gps'), ignorenulls=True)
         .over(first_window)
    ).withColumn(
        'next_coordinates',
        F.first(F.col('gps_coordinates'), ignorenulls=True)
         .over(first_window)
    )
)
  1. Filter and select the relevant rows and columns, implemented by:
condition = (F.col('timestamp') - F.col('prev_time')
             < F.col('next_time') - F.col('timestamp'))

ddf = (
    ddf
    .where(F.col('event').isNotNull())
    .withColumn(
        'ts_gps',
        F.when(condition | F.col('next_time').isNull(), F.col('prev_time')).otherwise(F.col('next_time'))
    ).withColumn(
        'gps_coordinates',
        F.when(condition | F.col('next_time').isNull(),
               F.col('prev_coordinates'))
         .otherwise(F.col('next_coordinates'))
    ).select(
        'ts_event',
        'event',
        'ts_gps',
        'gps_coordinates'
    )
)

Apache Ambari: Spark user management for SSH access to the cluster

I just set up an Apache HDFS / Spark cluster for my organization. With the user management, I'm not sure who can use the cluster for workloads. If I run Spark Shell directly on the master node, it will work, but there are many exceptions. I want to create roles / users who can access the shell and do spark programming and run jobs.

I tried to search documentation. I can create users and groups in the UI. But I do not think that does not mean that they are associated with SSH access to the cluster with these credentials.

Python – How do I perform a spark task with Luigi?

I have a lot of spark jobs and I want to coordinate them with Luigi. However, I get an error message that I can not fix.

Here is my basic script:

import luigi
from luigi.contrib.spark import PySparkTask
import pandas as pd

class rddTest(PySparkTask):

    def input(self):
        d = (1, 2, 3, 4, 5)
        return d

    def output(self):
        return luigi.LocalTarget("rdd.csv")

    def main(self, sc, *args):

        rdd1 = sc.parallelize(self.input())
        df = rdd1.toPandas()
        df.to_csv(self.output().path)

if __name__ == "__main__":
    luigi.run()

My luigi.cfg is down. I work in a large company and have just been approved to use Luigi, so I doubt if the central scheduler is set up in my cluster.

(core)
default-scheduler-host:name_of_your_luigi_server
python-home-dir:$PYTHON_HOME

(spark)
spark-submit:$SPARK_HOME/bin/spark-submit
hadoop-conf-dir:$HADOOP_HOME
yarn-conf-dir:$YARN_HOME
master:yarn-client
num-executors:10
executor-memory: 64g
executor-memory: 140g

In this case, I expect a CSV file named rdd.csv in the same directory from which I started the script, but not.

Use this startup command

python sparkTest.py --local-scheduler rddTest

I get the mistake

DEBUG: Checking if rddTest() is complete
INFO: Informed scheduler that task   rddTest__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: (pid 50029) Worker Worker(salt=326401154, workers=1, host=hpchdp2e, username=, pid=50029) running   rddTest()
INFO: Running command: $SPARK_HOME/bin/spark-submit --master yarn-client --name rddTest --executor-memory 140g --num-executors 10 /s/anaconda/users//miniconda2/lib/python2.7/site-packages/luigi/contrib/pyspark_runner.py /tmp/rddTestqRsQ4F/rddTest.pickle
ERROR: (pid 50029) Worker Worker(salt=326401154, workers=1, host=hpchdp2e, username=, pid=50029) failed    rddTest()
Traceback (most recent call last):
  File "/s/anaconda/users//miniconda2/lib/python2.7/site-packages/luigi/worker.py", line 199, in run
    new_deps = self._run_get_new_deps()
  File "/s/anaconda/users//miniconda2/lib/python2.7/site-packages/luigi/worker.py", line 141, in _run_get_new_deps
    task_gen = self.task.run()
  File "/s/anaconda/users//miniconda2/lib/python2.7/site-packages/luigi/contrib/spark.py", line 309, in run
    super(PySparkTask, self).run()
  File "/s/anaconda/users//miniconda2/lib/python2.7/site-packages/luigi/contrib/spark.py", line 66, in run
    super(SparkSubmitTask, self).run()
  File "/s/anaconda/users//miniconda2/lib/python2.7/site-packages/luigi/contrib/external_program.py", line 134, in run
    with self._proc_with_tracking_url_context(proc_args=args, proc_kwargs=kwargs) as proc:
  File "/s/anaconda/users//miniconda2/lib/python2.7/contextlib.py", line 17, in __enter__
    return self.gen.next()
  File "/s/anaconda/users//miniconda2/lib/python2.7/site-packages/luigi/contrib/external_program.py", line 168, in _proc_with_tracking_url_context
    main_proc = subprocess.Popen(proc_args, **proc_kwargs)
  File "/s/anaconda/users//miniconda2/lib/python2.7/subprocess.py", line 394, in __init__
    errread, errwrite)
  File "/s/anaconda/users//miniconda2/lib/python2.7/subprocess.py", line 1047, in _execute_child
    raise child_exception
OSError: (Errno 2) No such file or directory

Any help or insight is welcome!

kafka – Apache Spark for protocol security correlation

We plan to create a security analysis tool based on Elastic Stack and kafka / Spark. We can not find a good solution for custom correlation protocol correlation.

Here are some examples of the logic of rules:

1] When a system logs 5 failed login event logs for a user, it should trigger a warning or an event

2] When a user blacklisted a country, it should trigger a warning or an event

Which tool can we use with the above architecture to achieve this? Appreciate your input.

[Vn5socks.net] Car Update 24/7 – Good Socks 10h00 PM

LIFE ~ 220.79.34.109:2018 | 0,101 | Seoul | 11 | unknown | Korea, Republic of | Checked for vn5socks.net
LIFE ~ 124.65.145.126:7302 | 0.063 | Peking | 22 | unknown | China | Checked for vn5socks.net
LIFE ~ 42.112.20.116:7200 | 0,002 | Hanoi | 44 | unknown | Vietnam | Checked for vn5socks.net
LIFE ~ 47.254.25.174:25333 | 0.19 | Ottawa | ON | k1y4h7 | Canada | Checked for vn5socks.net
LIFE ~ 47.96.78.125:7302 | 0.074 | Ottawa | ON | k1y4h7 | Canada | Checked for vn5socks.net
LIFE ~ 2.234.226.32:17779 | 0.193 | unknown | unknown | unknown | Italy | Checked for vn5socks.net
LIFE ~ 108.160.129.94:1081 | 0.085 | El Centro | CA. | 92243 | United States | Checked for vn5socks.net
LIFE ~ 207.188.179.38:28728 | 0.24 | Spark | NV | 89434 | United States | Checked for vn5socks.net
LIFE ~ 95.110.186.48:10175 | 0.202 | unknown | unknown | unknown | Italy | Checked for vn5socks.net
LIFE ~ 169.239.221.90:50802 | 0.374 | unknown | unknown | unknown | unknown | Checked for vn5socks.net
LIFE ~ 199.247.27.91:3355 | 0.229 | yellow knife | NT | x1a2n3 | Canada | Checked for vn5socks.net
LIFE ~ 185.105.184.204:10023 | 0.359 | unknown | unknown | unknown | unknown | Checked for vn5socks.net
LIFE ~ 207.188.177.4:28728 | 0.24 | Spark | NV | 89434 | United States | Checked for vn5socks.net
LIFE ~ 50.62.61.96:57173 | 0.264 | Scottsdale | AZ | 85260 | United States | Checked for vn5socks.net
LIFE ~ 178.62.84.245:15085 | 0.204 | Bryansk | 10 | unknown | Russian Federation | Checked for vn5socks.net
LIFE ~ 125.68.172.20:7302 | 0.31 | Cheng you | 32 | unknown | China | Checked for vn5socks.net
LIFE ~ 49.76.133.212:1081 | 0.237 | Nanjing | 04 | unknown | China | Checked for vn5socks.net
LIFE ~ 207.188.178.34:28728 | 0.272 | Spark | NV | 89434 | United States | Checked for vn5socks.net

Feuerhimmel
Reviewed by Feuerhimmel on
,
[Vn5socks.net] Car Update 24/7 – Good Socks 10h00 PM
LIVE ~ 220.79.34.109:2018 | 0,101 | Seoul | 11 | Unknown | Korea, Republic of | Checked at vn5socks.net
LIVE ~ 124.65.145.126:7302 | 0.063 | Beijing | 22 | Unknown | China | Checked at vn5socks.net
LIVE ~ 42.112.20.116:7200 | 0.002 | Hanoi | 44 | Unknown | Vietnam | Checked at vn5socks.net
LIVE ~ 47.254.25.174:25333 | 0.19 | Ottawa | ON | k1y4h7 | Canada | Checked at vn5socks.net
LIVE ~ 47.96.78.125:7302 | 0.074 | Ottawa | ON | k1y4h7 | Canada | Checked for

Rating: 5

,

[Vn5socks.net] Car Update 24/7 – Good Socks 12h30 PM

LIFE ~ 45.76.212.182:39039 | 0.079 | unknown | unknown | unknown | unknown | Checked for vn5socks.net
LIFE ~ 42.112.20.116:7200 | 0,021 | Hanoi | 44 | unknown | Vietnam | Checked for vn5socks.net
LIFE ~ 47.94.19.105:3001 | 0.066 | Ottawa | ON | k1y4h7 | Canada | Checked for vn5socks.net
LIFE ~ 178.62.84.245:15085 | 0.207 | Bryansk | 10 | unknown | Russian Federation | Checked for vn5socks.net
LIFE ~ 220.79.34.109:2018 | 0.103 | Seoul | 11 | unknown | Korea, Republic of | Checked for vn5socks.net
LIFE ~ 47.254.25.174:25333 | 0.192 | Ottawa | ON | k1y4h7 | Canada | Checked for vn5socks.net
LIFE ~ 167.99.170.63:9050 | 0,238 | Fort Worth | TX | 76104 | United States | Checked for vn5socks.net
LIFE ~ 117.82.151.145:1081 | 0.281 | Suzhou | 04 | unknown | China | Checked for vn5socks.net
LIFE ~ 47.93.251.207:3001 | 0.062 | Ottawa | ON | k1y4h7 | Canada | Checked for vn5socks.net
LIFE ~ 185.105.184.204:10023 | 0.306 | unknown | unknown | unknown | unknown | Checked for vn5socks.net
LIFE ~ 207.188.178.34:28728 | 0.237 | Spark | NV | 89434 | United States | Checked for vn5socks.net
LIFE ~ 207.188.177.4:28728 | 0.294 | Spark | NV | 89434 | United States | Checked for vn5socks.net
LIFE ~ 125.65.92.104:7302 | 0.222 | Cheng you | 32 | unknown | China | Checked for vn5socks.net
LIFE ~ 207.188.179.38:28728 | 0,278 | Spark | NV | 89434 | United States | Checked for vn5socks.net
LIFE ~ 50.62.61.96:57173 | 0.215 | Scottsdale | AZ | 85260 | United States | Checked for vn5socks.net
LIFE ~ 125.68.172.20:7302 | 0,242 | Cheng you | 32 | unknown | China | Checked for vn5socks.net
LIFE ~ 169.239.221.90:50802 | 0.409 | unknown | unknown | unknown | unknown | Checked for vn5socks.net
LIFE ~ 95.110.186.48:10175 | 0,199 | unknown | unknown | unknown | Italy | Checked for vn5socks.net

Feuerhimmel
Reviewed by Feuerhimmel on
,
[Vn5socks.net] Car Update 24/7 – Good Socks 12h30 PM
LIVE ~ 45.76.212.182:39039 | 0.079 | Unknown | Unknown | Unknown | Unknown | Checked at vn5socks.net
LIVE ~ 42.112.20.116:7200 | 0,021 | Hanoi | 44 | Unknown | Vietnam | Checked at vn5socks.net
LIVE ~ 47.94.19.105:3001 | 0.066 | Ottawa | ON | k1y4h7 | Canada | Checked at vn5socks.net
LIVE ~ 178.62.84.245:15085 | 0,207 | Bryansk | 10 | Unknown | Russian Federation | Checked at vn5socks.net
LIVE ~ 220.79.34.109:2018 | 0,103 | Seoul | 11 | Unknown | Korea,

Rating: 5

,

[Vn5socks.net] Car Update 24/7 – Good Socks 10h10 PM

LIFE ~ 45.76.212.182:39039 | 0.082 | unknown | unknown | unknown | unknown | Checked for vn5socks.net
LIFE ~ 178.62.84.245:15085 | 0,199 | Bryansk | 10 | unknown | Russian Federation | Checked for vn5socks.net
LIFE ~ 47.93.251.207:3001 | 0.066 | Ottawa | ON | k1y4h7 | Canada | Checked for vn5socks.net
LIFE ~ 207.188.179.38:28728 | 0,235 | Spark | NV | 89434 | United States | Checked for vn5socks.net
LIFE ~ 59.66.132.17:1081 | 0.327 | Peking | 22 | unknown | China | Checked for vn5socks.net
LIFE ~ 207.188.177.4:28728 | 0,284 | Spark | NV | 89434 | United States | Checked for vn5socks.net
LIFE ~ 185.105.184.204:10023 | 0.353 | unknown | unknown | unknown | unknown | Checked for vn5socks.net
LIFE ~ 95.110.186.48:10175 | 0,199 | unknown | unknown | unknown | Italy | Checked for vn5socks.net
LIFE ~ 47.94.19.105:3001 | 0,071 | Ottawa | ON | k1y4h7 | Canada | Checked for vn5socks.net
LIFE ~ 163.172.18.149:9050 | 0.313 | Southend | M5 | unknown | Great Britain | Checked for vn5socks.net
LIFE ~ 117.82.151.145:1081 | 0,278 | Suzhou | 04 | unknown | China | Checked for vn5socks.net
LIFE ~ 220.79.34.109:2018 | 0,122 | Seoul | 11 | unknown | Korea, Republic of | Checked for vn5socks.net
LIFE ~ 125.68.172.20:7302 | 0,278 | Cheng you | 32 | unknown | China | Checked for vn5socks.net
LIFE ~ 47.96.78.125:7302 | 0,065 | Ottawa | ON | k1y4h7 | Canada | Checked for vn5socks.net
LIFE ~ 47.254.25.174:25333 | 0.204 | Ottawa | ON | k1y4h7 | Canada | Checked for vn5socks.net
LIFE ~ 167.99.170.63:9050 | 0.28 | Fort Worth | TX | 76104 | United States | Checked for vn5socks.net
LIFE ~ 2.234.226.32:17779 | 0.2 | unknown | unknown | unknown | Italy | Checked for vn5socks.net
LIFE ~ 169.239.221.90:50802 | 0.405 | unknown | unknown | unknown | unknown | Checked for vn5socks.net

Feuerhimmel
Reviewed by Feuerhimmel on
,
[Vn5socks.net] Car Update 24/7 – Good Socks 10h10 PM
LIVE ~ 45.76.212.182:39039 | 0,082 | Unknown | Unknown | Unknown | Unknown | Checked at vn5socks.net
LIVE ~ 178.62.84.245:15085 | 0,199 | Bryansk | 10 | Unknown | Russian Federation | Checked at vn5socks.net
LIVE ~ 47.93.251.207:3001 | 0.066 | Ottawa | ON | k1y4h7 | Canada | Checked at vn5socks.net
LIVE ~ 207.188.179.38:28728 | 0,235 | Sparks | NV | 89434 | United States | Checked at vn5socks.net
LIVE ~ 59.66.132.17:1081 | 0,327 | Beijing | 22 | Unknown | China

Rating: 5

,