Flume自定义组件之SqlServerSink

自定义的sink需要继承Flume的AbstractSink类,最好实现Configurable接口,实现了该接口以后,自定义组件中的一些参数就可以通过在配置文件中实现。

process方法定义了事件的处理逻辑。configure方法用于获取组件的自定参数。

可以配置的参数的值都通过该方法来获取。不需要配置的参数,但是又必须的参数可以以实例成员变量的方式给出。通过这种灵活的手段,可以设置一些既可以让用户配置,又有默认值的一些参数。具体方法是,在configure方法中,先获取用户自定义的值,如果该值不合法,或者不合逻辑的话,就使用程序中提供的默认值。比如SqlServerSink组件的batchSize参数,该数值既可以由用户来设置,也有默认的值。

process方法和configure方法是必须实现的。

start方法是可以重写的。在该方法中,进行sqlserver连接的初始化。比如加载驱动,获得连接,为了提高程序的运行效率,在插入数据的时候,使用批量提交数据。所以在start方法中可以将JDBC的自动提交功能关闭。

destroyConnection方法是自定义方法,该方法在程序遇到异常或者终止(SqlServerSink的stop方法被调用)的时候,用于释放本地资源。

toString方法用于输出关于自定义Sink组件的一些信息。

代码中有部分信息需要读者自己去实现,如,获得sql语句的方法,批量处理失败的时候,是否需要将此类信息以email的方式或者其他方式通知某些人。

整个过程结构清晰,不需要过多解释,如有不明白,欢迎留言。

代码如下:

package cn.ancony.flumeSink; 

import com.google.common.collect.Lists;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;
import java.util.List;

public class SqlServerSink extends AbstractSink implements Configurable {
    private static final Logger LOG = LoggerFactory.getLogger(SqlServerSink.class);
    private static final int DEFAULT_BATCHSIZE = 500;

    private String url;//必须配置的字段
    private String user;//必须配置的字段
    private String password;//必须配置的字段

    private Integer batchSize;//可选配置的字段

    private Connection connection;
    private Statement statement;

    @Override
    public Status process() throws EventDeliveryException {
        Status status = Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();

        List
 
   actions = Lists.newArrayList();
        try {
            transaction.begin();
            // This try clause includes whatever Channel/Event operations you want to do
            long processedEvent = 0;
            for (; processedEvent < batchSize; processedEvent++) {
                // Receive new data
                Event event = channel.take();
                if (event == null) {
                    status = Status.BACKOFF;
                    break;
                } else {
                    //TODO 得到事件的header和body,
                    //TODO 如果需要事件的header信息,按照如下的方式获取,前提是在source中配置了header的信息。
                    //String file = event.getHeaders().get("file");
                    //LOG.info("file is: " + file);
                    //String body = new String(event.getBody());
                    //LOG.info("body is: " + body);

                    //TODO 根据header和body进行处理,得到需要执行的sql语句。
                    //TODO 得到需要的sql语句,根据实际情况自定义
                    String sql = null;
                    //sql = SqlServerUtils.getSQL(file, body);
                    LOG.info("sql is: " + sql);
                    if (sql == null) {
                        continue;
                    }
                    //TODO 将执行的语句放入set之中。
                    actions.add(sql);
                }
            }

            if (actions.size() > 0) {
                for (String sql : actions) {
                    statement.addBatch(sql);
                    LOG.info("add sql to batch:{}", sql);
                }
                statement.executeBatch();
                connection.commit();
            }
            status = Status.READY;
            transaction.commit();
        } catch (ChannelException e) {
            transaction.rollback();
            LOG.error("Unable to get event from channel. Exception follows.", e);
            status = Status.BACKOFF;
        } catch (BatchUpdateException e) {
            transaction.rollback();
            //将异常写入Jermey的报警
  系统
            //TODO 如果批量提交失败,请提供自己的处理逻辑
            //SqlServerUtils ssu = new SqlServerUtils();
            //ssu.alarm(e);
            LOG.error("Batch update exception, is: " + e + ", it's stack trace is :");
            e.printStackTrace();
        } catch (Exception e) {
            transaction.rollback();
            LOG.error("Unable to communicate with sqlserver server. Exception follows.", e);
            status = Status.BACKOFF;
            destroyConnection();
        } finally {
            transaction.close();
        }
        return status;
    }

    //获得
  数据库的连接,并且初始化statement.
    @Override
    public synchronized void start() {
        LOG.info("SqlServerSink start...");
        try {
            //1.加载驱动
            Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
            LOG.info("sqlserver driver load success.");
        } catch (Exception e) {
            e.printStackTrace();
            LOG.error("Unable to load sqlserver driver using com.microsoft.sqlserver.jdbc.SQLServerDriver. exception is: " + e);
            throw new RuntimeException(e);
        }
        try {
            //2.获得连接
            connection = DriverManager.getConnection(url, user, password);
            //TODO 3.准备statement。是否需要一些参数。
            statement = connection.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_READ_ONLY);
            //关闭自动提交
            connection.setAutoCommit(false);
        } catch (Exception e) {
            LOG.error("Unable to create sqlserver connection using url="
                    + url + ", user=" + user + ", password=" + password + ".exception is: " + e);
            /* Try to prevent leaking resources. */
            destroyConnection();
            throw new RuntimeException(e);
        }
        super.start();
        LOG.info("SqlServerSink sink {} started", this.getName());
    }

    //连接的销毁。一个是销毁statement,另外一个是销毁connection.
    private void destroyConnection() {
        if (statement != null) {
            try {
                statement.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        statement = null;
        if (connection != null) {
            LOG.debug("Destroying connection to: {}", url);
            try {
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        connection = null;
    }

    //sqlServer在停止的时候需要销毁与数据库的连接
    @Override
    public synchronized void stop() {
        LOG.info("sqlserver sink {} stopping", getName());
        destroyConnection();
        super.stop();
        LOG.info("sqlserver sink {} stopped", this.getName());
    }

    @Override
    public void configure(Context context) {
        url = context.getString("url");
        if (url == null) {
            throw new IllegalArgumentException("url config setting is not " +
                    "specified for sink " + getName());
        }
        user = context.getString("user");
        if (user == null) {
            throw new IllegalArgumentException("user config setting is not " +
                    "specified for sink " + getName());
        }
        password = context.getString("password");
        if (password == null) {
            throw new IllegalArgumentException("password config setting is not " +
                    "specified for sink " + getName());
        }
        //对于非必须配置的值,先读取配置文件,如果有值就读取,如果没有值,就使用默认的值。并作为一个警告。
        batchSize = context.getInteger("batchSize", DEFAULT_BATCHSIZE);
        if (batchSize < 0) {
            LOG.warn(getName() + ". batchSize must be  positive number. Defaulting to "
                    + DEFAULT_BATCHSIZE);
            batchSize = DEFAULT_BATCHSIZE;
        }
    }

    @Override
    public String toString() {
        return "{ Sink type:" + getClass().getSimpleName() + ", name:" + getName() +
                " }";
    }
}
 
Leave A Comment