Cassandra Bulk Loader never exits on a null pointer exception, so my calling shell script doesn’t see the failure

Posted on

Problem :

I created a script that processes files in a loop, using a process call. I check the exit code of said call to see if I should move the files (on success.) Problem is, that when the process fails with an exception, it never exits. How do I go about detecting the exception occurred, so I can make the script move on to the next files?

Relevant part of script

# Stream data
sstableloader -d $3 $tablepathfull

# On success, move data to target dir
if [[ $? != 0 ]]; then
    echo "Error: Table failed - $tablepathfull"
else
    echo "Table OK - $tablepathfull"
    trgtdir="$2/$hostname/$keyspacename/$typename/$timestamp/$keyspacename/$tablename"
    mkdir -p $trgtdir
    mv $tablepathfull/* $trgtdir
    rmdir $tablepathfull
fi

If there is no ‘official’ way, is it perhaps possible to capture the output (see below) of the process call, and simply kill the process once/if the exception occurs?

Exception output

Exception in thread "STREAM-OUT-/XX.XX.XXX.88" Exception in thread "STREAM-OUT-/XX.XX.XXX.92" java.lang.NullPointerException
    at org.apache.cassandra.streaming.ConnectionHandler$MessageHandler.signalCloseDone(ConnectionHandler.java:249)
    at org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:375)
    at java.lang.Thread.run(Thread.java:744)
java.lang.NullPointerException
    at org.apache.cassandra.streaming.ConnectionHandler$MessageHandler.signalCloseDone(ConnectionHandler.java:249)
    at org.apache.cassandra.streaming.ConnectionHandler$OutgoingMessageHandler.run(ConnectionHandler.java:375)
    at java.lang.Thread.run(Thread.java:744)

Solution :

The only workaround I could come with is using subprocesses and files:

TEMP_FILE='/tmp/some_file.txt'

function load_table() {
  if [ $# -lt 2 ]; then
    printf "1" > "${TEMP_FILE}"
    return 1
  fi

  local param1="$1"
  local table_full_path="$2"
  local exit_code

  # Stream data
  sstableloader -d "${param1}" "${table_full_path}" >> "${TEMP_FILE}"
  exit_code=$?

  printf "n%s" "${exit_code}" >> "${TEMP_FILE}"
}

function is_process_running() {
  if [ $# -eq 0 ]; then
    return 1
  fi
  local process_id="$1"

  ps aux | sed -r 's/[ ]+/ /g' | cut -d' ' -f2 | grep -q "${process_id}"
  return $?
}

function exceptions_count() {
  local count=$(tail -10 "${TEMP_FILE}" | grep -c "Exception")
  return $count
}

…

load_table "$3" "${tablepathfull}" &

# Given you have one subprocess only.. get the pid of the first subprocess in the list
job_pids=( $(jobs -p) )
load_table_job_pid=${job_pids[0]}

while is_process_running "${load_table_job_pid}" && exceptions_count -eq 0; do
  sleep 5
done

exit_code=0
if is_process_running "${load_table_job_pid}"; then
  local load_table_job_gid=$(ps x -o  "%p %r %y %x %c " | sed -r -e 's/[ ]+/ /g' -e 's/^[ ]+//g' | grep -E "^${load_table_job_pid} " | cut -d' ' -f2)
  kill -TERM -$load_table_job_gid >/dev/null 2>&1
  exit_code=1
else
  exit_code=$(tail -1 "${TEMP_FILE}")
fi

rm -f "${TEMP_FILE}"

# Your code
# On success, move data to target dir
if [ $exit_code -ne 0 ]; then
    echo "Error: Table failed - $tablepathfull"
else
    echo "Table OK - $tablepathfull"
    trgtdir="$2/$hostname/$keyspacename/$typename/$timestamp/$keyspacename/$tablename"
    mkdir -p $trgtdir
    mv $tablepathfull/* $trgtdir
    rmdir $tablepathfull
fi

You can improve the code by adding a retry count or something.

Leave a Reply

Your email address will not be published. Required fields are marked *