How to patrol file such as Filebeat of Elastic stack

4 minute read

FilePatrol

example

FilePatrol filePatrol = new FilePatrol("fileName.log", "yyyyMMdd");

filePatrol.setCallback(new FilePatrol.ICallback() {
  @Override
  public void onMessage(String msg) {
    System.out.println(msg);
});

Thread thread1 = new Thread(filePatrol);
thread1.start();

FilePatrol.java

import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.BasicFileAttributes;

/**
 * Patrol file and read them
 */
public class FilePatrol implements Runnable {

    String VERSION = "FilePatrol ver.0.02";

    @Override
    public void run() {
        LOG.write(VERSION);
        startPatrol();
    }

    /**
     * Callback interface for sending
     */
    public interface ICallback {
        public void onMessage(String msg);
    }

    private ICallback mCallback;
    private String mFilename = null;
    private String mDatePattern = null;
    private String mFileKey = "";

    /**
     * Creator
     * @param fname Full Path
     */
    public FilePatrol(String fname) {
        this.mFilename = fname;
    }

    /**
     * Creator - Check file path and pattern
     * @param fname Full path including filename
     * @param datepattern "yyyyMMdd"
     */
    public FilePatrol(String fname, String datepattern) {
        this.mCallback = null;
        this.mFilename = fname;
        this.mDatePattern = datepattern;
    }

    /**
     * Creator - add filekey for logging
     * @param fname
     * @param datepattern
     * @param filekey
     */
    public FilePatrol(String fname, String datepattern, String filekey) {
        this.mCallback = null;
        this.mFilename = fname;
        this.mDatePattern = datepattern;
        this.mFileKey = filekey;
    }

    /**
     * Register callback
     * @param callback
     */
    public void setCallback(ICallback callback) {
        this.mCallback = callback;
    }

    /**
     * Get iNode
     * @param fname
     * @return
     */
    public String getInode(String fname) {
        String rtn = null;
        BasicFileAttributes attr = null;

        try {
            Path path = Paths.get(fname);
            attr = Files.readAttributes(path, BasicFileAttributes.class);
            Object fileKey = attr.fileKey();
            String s = fileKey.toString(); // Ex : "(dev=fd01,ino=419507)"
            rtn = s.substring(s.indexOf("ino=") + 4, s.indexOf(")"));
        } catch (Exception e) {
            LOG.error(String.format("%s Failed to get iNode: %s", mFileKey, e.getMessage()));
        }
        return rtn;
    }

    /**
     * Start
     */
    public void startPatrol() {

        String msg = null;
        boolean isRunning = true;
        String _curfilename = UTILS.getDateFileName(this.mFilename, mDatePattern);
        String _filename = _curfilename;
        long mCtime = 0, mPtime = 0;
        String _curiNode = null;
        String _preiNode = null;
        Reader fileReader = null;

        char[] buffer = new char[102400];
        int bufidx = 0;
        int singleCh = 0;
        // -----------------------------
        // Main Loop
        // -----------------------------
        _curfilename = UTILS.getDateFileName(this.mFilename, mDatePattern); // get current filename
        while (isRunning) {
            // After mv check if iNode changed or not
            mCtime = System.currentTimeMillis();
            if (mCtime - mPtime > 5000) {
                _curfilename = UTILS.getDateFileName(this.mFilename, mDatePattern); // get current filename
                mPtime = mCtime;

                // Check the file status
                _curiNode = getInode(_curfilename);
                // First open
                if(( _preiNode == null) && (_curiNode != null))
                    _preiNode = _curiNode;

                if (_curiNode == null) {
                    try {
                        Thread.sleep(3000);
                        continue;
                    } catch (Exception e) {
                    }
                }
                else {
                    if (!_curiNode.equals(_preiNode)) {
                        LOG.error(String.format("%s [%s]Current iNode: [%s] -> [%s]", mFileKey, _curfilename, _preiNode, _curiNode));
                        _preiNode = _curiNode;
                        // Read from the beginning
                        setCurrFilePos(_curfilename,0);
                        // Close previous buffer ald reopen
                        if (fileReader != null) {
                            try {
                                fileReader.close();
                                fileReader = null;
                            } catch (Exception e) {
                                LOG.error(String.format("[%s] read Exception : %s", _curfilename, e.getMessage()));
                                fileReader = null;
                            }
                        }
                    }
                }
            }

            // ------------------------------------------
            // Check filename (date changed)
            // ------------------------------------------
            if (!_filename.equals(_curfilename)) {

                LOG.info(String.format("%s File changed:[%s]->[%s]", mFileKey, _filename, _curfilename));

                _filename = _curfilename;

                // Read from the beginning when open the file
                setCurrFilePos(_curfilename,0);
                // Close previous buffer and reopen
                if (fileReader != null) {
                    try {
                        fileReader.close();
                        fileReader = null;
                    } catch (Exception e) {
                        LOG.error(String.format("[%s] read Exception : %s", _filename, e.getMessage()));
                        fileReader = null;
                    }
                }
            }

            // ------------------------------------------
            // Open file
            // ------------------------------------------
            if (fileReader == null) {
                try {
                    long _filelen = 0;

                    LOG.info(String.format("%s [%s]Open FileReader", mFileKey, _filename));
                    fileReader = new FileReader(_filename);

                    // Get file size
                    File file = new File(_filename);
                    if (file.exists())
                        _filelen = file.length();
                    LOG.info(String.format("%s [%s]File length:[%d]", mFileKey, _filename, _filelen));

                    // Move to the end of the file
                    if (mStartFilePos == -1) {
                        LOG.info(String.format("%s [%s]Move to the end of file", mFileKey, _filename));
                        fileReader.skip(_filelen);
                    } else {
                        mCurrFilePos = readPosNumber(_filename);
                        if (mCurrFilePos <= _filelen) {
                            LOG.info(String.format("%s [%s]Move to the position [%d] (FileLen:%d)", mFileKey, _filename, mCurrFilePos, _filelen));
                            fileReader.skip(mCurrFilePos);
                        } else {
                            LOG.error(String.format("%s [%s]File position error [%d] (FileLen:%d)", mFileKey, _filename, mCurrFilePos, _filelen));
                        }
                    }
                } catch (FileNotFoundException e) {
                    LOG.error(String.format("%s [%s]FileNotFoundException: %s", mFileKey, _filename, e.getMessage()));
                } catch (Exception e) {
                    LOG.error("Exception: " + e.getMessage());
                }
                // ------------------------------------------
                // Check if opened
                // ------------------------------------------
                if (fileReader == null) {
                    try {
                        LOG.error(String.format("%s [%s]File is NOT ready", mFileKey, _filename));
                        Thread.sleep(3000);
                        // Set position to 0 when starts reading
                        setCurrFilePos(_filename,0);
                        continue;
                    } catch (Exception e) {
                        LOG.error(String.format("%s sleep Error: %s", mFileKey, e.getMessage()));
                        break;
                    }
                }
            }

            // ------------------------------------------
            // Read file
            // ------------------------------------------
            try {
                bufidx = 0;
                mReadCnt = 0;
                while((singleCh = fileReader.read()) != -1) {
                    mReadCnt++;
                    if ((char)singleCh == '\r' || (char)singleCh == '\n') {
                        if (bufidx > 0) {
                            char newArr[] = new char[bufidx];
                            for (int i = 0; i < bufidx; i++) {
                                newArr[i] = buffer[i];
                            }
                            msg = new String(newArr);
                            if (mCallback != null && msg != null) {
                                mCallback.onMessage(msg); // Send data
                            }
                            bufidx = 0; // Sent done
                            msg = null;
                            continue; // continues to read
                        } else {
                            continue; // Skip (\r\n)
                        }
                    }
                    buffer[bufidx++] = (char)singleCh;
                }

                // Update current position
                if (mReadCnt > 0)
                    setCurrFilePos(_filename, mCurrFilePos + mReadCnt);

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    LOG.error(String.format("[%s] InterruptedException : %s", _filename, e.getMessage()));
                }

                continue;

            } catch (FileNotFoundException e) {
                LOG.error(String.format("[%s] read FileNotFoundException : %s", _filename, e.getMessage()));
                fileReader = null;
            } catch(IOException e) {
                LOG.error(String.format("[%s] read IOException : %s", _filename, e.getMessage()));
                fileReader = null;
            } catch (Exception e) {
                LOG.error(String.format("[%s] read Exception : %s %s", _filename, e.getMessage(), e.getStackTrace()));
                fileReader = null;
            }
        }
        // Stop thread
        Thread.currentThread().interrupt();

    } /* End of start */
} /* End of class */

Tags:

Categories:

Updated:

Leave a comment