Skip to content

Commit

Permalink
PDI-8731 - Cassandra SSTable Output (bulkloader) step does not work w…
Browse files Browse the repository at this point in the history
…ith Cassandra 1.1.x

Resolve cassandra transitively to get runtime dependencies
Add support for cassandra 2.x
Split writer into CQL2 and CQL3 specific ones
Try preventing JVM from exit in case issues with cassandra config
Fixes for CQL2 writer:
 - read partitioner from config, use Murmur3Partitioner by default
 - store date as longs

[PDI-8731][Tests] - Added IT and made some cosmetic changes
  • Loading branch information
pavel-sakun authored and mdamour1976 committed Aug 8, 2016
1 parent db896da commit ca08ec8
Show file tree
Hide file tree
Showing 19 changed files with 1,898 additions and 249 deletions.
34 changes: 18 additions & 16 deletions ivy.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,34 @@

<dependencies defaultconf="default->default">

<!-- Pentaho (internal) dependencies -->
<dependency org="pentaho-kettle" name="kettle-engine" rev="${dependency.kettle.revision}" transitive="false" conf="default->default" changing="true" />
<dependency org="pentaho-kettle" name="kettle-core" rev="${dependency.kettle.revision}" transitive="false" conf="default->default" changing="true" />
<dependency org="pentaho-kettle" name="kettle-ui-swt" rev="${dependency.kettle.revision}" transitive="false" conf="default->default" changing="true" />
<dependency org="pentaho" name="pentaho-xul-core" rev="${dependency.pentaho-xul.revision}" changing="true"/>
<dependency org="pentaho" name="pentaho-xul-swt" rev="${dependency.pentaho-xul.revision}" changing="true"/>
<dependency org="pentaho" name="metastore" rev="${dependency.pentaho-metastore.revision}" changing="true" />

<!-- Third-party (external) dependencies -->
<dependency org="org.eclipse.swt.win32.win32" name="x86" rev="3.3.0-v3346" transitive="false" />
<dependency org="org.eclipse" name="jface" rev="3.3.0-I20070606-0010" transitive="false" />

<dependency org="pentaho-kettle" name="kettle-engine" rev="${dependency.kettle.revision}" transitive="false" conf="default->default"
changing="true" />
<dependency org="pentaho-kettle" name="kettle-core" rev="${dependency.kettle.revision}" transitive="false" conf="default->default"
changing="true" />
<dependency org="pentaho-kettle" name="kettle-ui-swt" rev="${dependency.kettle.revision}" transitive="false" conf="default->default"
changing="true" />
<dependency org="pentaho" name="pentaho-xul-core" rev="${dependency.pentaho-xul.revision}" changing="true" />

<dependency org="pentaho" name="metastore" rev="${dependency.pentaho-metastore.revision}" changing="true" />

<dependency org="com.google.guava" name="guava" rev="${dependency.guava.revision}" transitive="false" conf="default->default" changing="false"/>
<dependency org="org.apache.thrift" name="libthrift" rev="${dependency.libthrift.revision}" transitive="false" conf="default->default"
changing="false"/>
<dependency org="org.apache.thrift" name="libthrift" rev="${dependency.libthrift.revision}" transitive="false" conf="default->default" changing="false"/>
<dependency org="org.apache.cassandra" name="cassandra-all" rev="${dependency.apache-cassandra.revision}" transitive="false" conf="default->default" changing="false"/>
<dependency org="org.apache.cassandra" name="cassandra-thrift" rev="${dependency.apache-cassandra-thrift.revision}" transitive="false" conf="default->default" changing="false"/>

<dependency org="org.yaml" name="snakeyaml" rev="${dependency.snakeyaml.revision}" transitive="false" conf="default->default" changing="false"/>
<dependency org="org.slf4j" name="slf4j-api" rev="1.5.8" transitive="false" conf="default->default" changing="false"/>
<dependency org="org.slf4j" name="slf4j-log4j12" rev="1.5.8" transitive="false" conf="default->default" changing="false"/>
<dependency org="log4j" name="log4j" rev="1.2.14" />

<dependency org="log4j" name="log4j" rev="1.2.14" />

<dependency org="junit" name="junit" rev="4.5" conf="test->default" />
<!-- for testing -->
<dependency org="org.mockito" name="mockito-all" rev="1.9.5" conf="test->default" transitive="false" />
<dependency org="junit" name="junit" rev="4.5" conf="test->default" />
<dependency org="org.apache.commons" name="commons-vfs2" rev="2.1-20150824" conf="test->default" />
<dependency org="pentaho-kettle" name="kettle-engine" rev="${dependency.kettle.revision}" transitive="true" conf="test->default" changing="true" />
<dependency org="pentaho-kettle" name="kettle-engine-test" rev="${dependency.kettle.revision}" transitive="false" conf="test->default" />

<dependency org="pentaho-kettle" name="kettle-core" rev="${dependency.kettle.revision}" transitive="true" conf="test->default" changing="true" />
</dependencies>
</ivy-module>
171 changes: 118 additions & 53 deletions src/org/pentaho/di/trans/steps/cassandrasstableoutput/SSTableOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Big Data
*
* Copyright (C) 2002-2013 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand All @@ -22,11 +22,15 @@

package org.pentaho.di.trans.steps.cassandrasstableoutput;

import java.io.File;
import java.net.URI;
import java.security.Permission;
import java.util.HashMap;
import java.util.Map;

import org.pentaho.di.core.Const;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.LogChannelInterface;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.trans.Trans;
Expand All @@ -36,81 +40,74 @@
import org.pentaho.di.trans.step.StepInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaInterface;
import org.pentaho.di.trans.steps.cassandrasstableoutput.writer.AbstractSSTableWriter;
import org.pentaho.di.trans.steps.cassandrasstableoutput.writer.SSTableWriterBuilder;

/**
* Output step for writing Cassandra SSTables (sorted-string tables).
*
*
* @author Rob Turner (robert{[at]}robertturner{[dot]}com{[dot]}au)
* @author Mark Hall (mhall{[at]}pentaho{[dot]}com)
*/
public class SSTableOutput extends BaseStep implements StepInterface {

protected SSTableOutputMeta m_meta;
protected SSTableOutputData m_data;

public SSTableOutput( StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta,
Trans trans ) {

super( stepMeta, stepDataInterface, copyNr, transMeta, trans );
}

private static final SecurityManager sm = System.getSecurityManager();
/** The number of rows seen so far for this batch */
protected int rowsSeen;

/** The directory to output to */
protected String directory;

/** The keyspace to use */
protected String keyspace;

/** The name of the column family (table) to write to */
protected String columnFamily;

/** The key field used to determine unique keys (IDs) for rows */
protected String keyField;

/** Size (MB) of write buffer */
protected String bufferSize;

/** Writes the SSTable output */
protected SSTableWriter writer;

protected AbstractSSTableWriter writer;
/** Used to determine input fields */
protected RowMetaInterface inputMetadata;

/** List of field names (optimization) */
private String[] fieldNames;

/** List of field indices (optimization) */
private int[] fieldValueIndices;

private void initialize( StepMetaInterface smi, StepDataInterface sdi ) throws Exception {
public SSTableOutput( StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta,
Trans trans ) {

super( stepMeta, stepDataInterface, copyNr, transMeta, trans );
}

private void initialize( SSTableOutputMeta smi ) throws Exception {
first = false;
rowsSeen = 0;
m_meta = (SSTableOutputMeta) smi;
m_data = (SSTableOutputData) sdi;
inputMetadata = getInputRowMeta();

String yamlPath = environmentSubstitute( m_meta.getYamlPath() );
String yamlPath = environmentSubstitute( smi.getYamlPath() );
String directory = environmentSubstitute( smi.getDirectory() );
String keyspace = environmentSubstitute( smi.getCassandraKeyspace() );
String columnFamily = environmentSubstitute( smi.getColumnFamilyName() );
String keyField = environmentSubstitute( smi.getKeyField() );
String bufferSize = environmentSubstitute( smi.getBufferSize() );

if ( Const.isEmpty( yamlPath ) ) {
throw new Exception( BaseMessages.getString( SSTableOutputMeta.PKG, "SSTableOutput.Error.NoPathToYAML" ) );
}
logBasic( BaseMessages.getString( SSTableOutputMeta.PKG, "SSTableOutput.Message.YAMLPath", yamlPath ) );

System.setProperty( "cassandra.config", "file:" + yamlPath );
File outputDir;
if ( Const.isEmpty( directory ) ) {
outputDir = new File( System.getProperty( "java.io.tmpdir" ) );
} else {
outputDir = new File( new URI( directory ) );
}

if ( !outputDir.exists() ) {
if ( !outputDir.mkdirs() ) {
throw new KettleException( BaseMessages.getString( SSTableOutputMeta.PKG,
"SSTableOutput.Error.OutputDirDoesntExist" ) );
}
}

directory = environmentSubstitute( m_meta.getDirectory() );
keyspace = environmentSubstitute( m_meta.getCassandraKeyspace() );
columnFamily = environmentSubstitute( m_meta.getColumnFamilyName() );
keyField = environmentSubstitute( m_meta.getKeyField() );
bufferSize = environmentSubstitute( m_meta.getBufferSize() );
if ( Const.isEmpty( columnFamily ) ) {
throw new KettleException( BaseMessages.getString( SSTableOutputMeta.PKG,
"SSTableOutput.Error.NoColumnFamilySpecified" ) );
}

if ( Const.isEmpty( keyField ) ) {
throw new KettleException( BaseMessages.getString( SSTableOutputMeta.PKG, "SSTableOutput.Error.NoKeySpecified" ) );
}

// what are the fields? where are they?
fieldNames = inputMetadata.getFieldNames();
fieldValueIndices = new int[fieldNames.length];
Expand All @@ -121,13 +118,41 @@ private void initialize( StepMetaInterface smi, StepDataInterface sdi ) throws E
if ( writer != null ) {
writer.close();
}
writer = new SSTableWriter();
writer.setDirectory( directory );
writer.setKeyspace( keyspace );
writer.setColumnFamily( columnFamily );
writer.setKeyField( keyField );
writer.setBufferSize( Integer.parseInt( bufferSize ) );
writer.init();

SSTableWriterBuilder builder =
new SSTableWriterBuilder().withConfig( yamlPath ).withDirectory( outputDir.getAbsolutePath() ).withKeyspace(
keyspace ).withColumnFamily( columnFamily ).withRowMeta( getInputRowMeta() ).withKeyField( keyField )
.withCqlVersion( smi.getUseCQL3() ? 3 : 2 );
try {
builder.withBufferSize( Integer.parseInt( bufferSize ) );
} catch ( NumberFormatException nfe ) {
logBasic( BaseMessages.getString( SSTableOutputMeta.PKG, "SSTableOutput.Message.DefaultBufferSize" ) );
}

writer = builder.build();
try {
disableSystemExit( sm, log );
writer.init();
} catch ( Exception e ) {
throw new RuntimeException( BaseMessages.getString( SSTableOutputMeta.PKG, "SSTableOutput.Error.InvalidConfig" ),
e );
} finally {
// Restore original security manager if needed
if ( System.getSecurityManager() != sm ) {
System.setSecurityManager( sm );
}
}
}

void disableSystemExit( SecurityManager sm, LogChannelInterface log ) {
// Workaround JVM exit caused by org.apache.cassandra.config.DatabaseDescriptor in case of any issue with
// cassandra config. Do this by preventing JVM from exit for writer initialization time or give user a clue at
// least.
try {
System.setSecurityManager( new NoSystemExitDelegatingSecurityManager( sm ) );
} catch ( SecurityException se ) {
log.logError( BaseMessages.getString( SSTableOutputMeta.PKG, "SSTableOutput.Error.JVMExitProtection" ), se );
}
}

@Override
Expand All @@ -136,32 +161,42 @@ public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws
if ( isStopped() ) {
return false;
}

Object[] r = getRow();

if ( first ) {
try {
initialize( (SSTableOutputMeta) smi );
} catch ( Exception e ) {
throw new KettleException(
BaseMessages.getString( SSTableOutputMeta.PKG, "SSTableOutput.Error.WriterInitFailed" ), e );
}
}

try {
if ( r == null ) {
// no more output - clean up/close connections
setOutputDone();
closeWriter();
return false;
}
if ( first ) {
initialize( smi, sdi );
}
// create record
Map<String, Object> record = new HashMap<String, Object>();
for ( int i = 0; i < fieldNames.length; i++ ) {
Object value = r[fieldValueIndices[i]];
if ( SSTableWriter.isNull( value ) ) {
if ( value == null || "".equals( value ) ) {
continue;
}
record.put( fieldNames[i], value );
}
// write it
writer.processRow( record );
incrementLinesWritten();
} catch ( Exception e ) {
logError( BaseMessages.getString( SSTableOutputMeta.PKG, "SSTableOutput.Error.FailedToProcessRow" ), e );
// single error row - found it!
putError( getInputRowMeta(), r, 1L, e.getMessage(), null, "ERR_SSTABLE_OUTPUT_01" );
incrementLinesRejected();
}

// error will occur after adding it
Expand All @@ -187,4 +222,34 @@ public void closeWriter() {
}
}
}

private class JVMShutdownAttemptedException extends SecurityException {
}

private class NoSystemExitDelegatingSecurityManager extends SecurityManager {
private SecurityManager delegate;

NoSystemExitDelegatingSecurityManager( SecurityManager delegate ) {
this.delegate = delegate;
}

@Override
public void checkPermission( Permission perm ) {
if ( delegate != null ) {
delegate.checkPermission( perm );
}
}

@Override
public void checkPermission( Permission perm, Object context ) {
if ( delegate != null ) {
delegate.checkPermission( perm, context );
}
}

@Override
public void checkExit( int status ) {
throw new JVMShutdownAttemptedException();
}
}
}
Loading

0 comments on commit ca08ec8

Please sign in to comment.