How to detect un-process files in hadoop development with Python, HBase, and Pig

banner

How to detect un-process files in hadoop development with Python, HBase, and Pig

In this post, hadoop development professionals are sharing guide that helps in detecting un-process data files in HDFS system with Python, Pig, and HBase. You can read this post and find the way to detect such files.

In the real big data application, we need to process the data hourly or daily. Therefore, we need a solution to detect which data file we already process, this work will reduce process time and we will not have the duplication in our processed data set.

Environment : Java: JDK 1.7

Cloudera version: CDH5.4.7, please refer to this link: http://www.cloudera.com/downloads/cdh/5-4-7.html

Initial steps

1. We need to prepare some input data files, open the file with vi tool to create a local file:

  1. vi file1
    1. Jack
    2. Ryan
    3. Jean
  2. Vi file2
    1. Candy
    2. Bill

2. We need to put the local files to Hadoop Distributed File System (HDFS), use this command:

hadoop fs -mkdir -p /data/mydata/sample hadoop fs -put file1 /data/mydata/sample/ hadoop fs -put file2 /data/mydata/sample/

3. Create the HBase table to store the last processed time.

Python, HBase, and Pig

Code walk through

This is pig script to load the data and just simple dump the data to command line to prove that we only process new files.

Note: Please note that this pig script will compile to Map Reduce Job to store the data to HBase in parallel.

/* /* * Load the data from HDFS location from a unprocess file list which is passed from *Python script */ rawData = LOAD '$filesList' USING PigStorage(';') AS( id:chararray, name:chararray); /* * Output the data to command line */ dump rawData;

This is Python script which using Hadoop, HBase and Pig libraries to detect unprocess files and pass files to Pig script:

  • 0. We need to import org.apache.hadoop.* and org.apache.pig.* packagein Python script.
  • 1. Access to HBase to get the latest time which we already processed data.
  • 2. We will loop and check the modification time of all files in input HDFS location with the value from HBase data.
  • 3. If the value from files is greater than the value from HBase, we will add the file to a list unprocess file and pass this file list to Pig to process the data. If the value from files is less than 0, we will stop the Map Reduce Job because we do not have any new files to process.
  • 4. We will update the latest process file to HBase again to use in the future.
# We create a connection to HBase with default configuration # We need to define the column family, column quantifier. configurationSetting = HBaseConfiguration.create() processHistoryTable = None fs = FileSystem.get(configurationSetting) cf = 'cf' qlf = 'processedTime' # This function to access to HBase with table name is processedHistory def accessHBaseTable(): print 'Connecting to HBase Server …' global processHistoryTable processHistoryTable = HTable(configurationSetting, 'processedHistory') # This method to put the latest process time to HBase def putData(processHistory, rowkey, value): putList = [] put = Put(Bytes.toBytes(rowkey)) put.add(Bytes.toBytes(cf), Bytes.toBytes(qlf), Bytes.toBytes(str(value))) putList.append(put) processHistory.put(putList) # This method to convert to Byte in HBase def checkBytes(key, processHistory): getBytes = Get(Bytes.toBytes(key)) getBytes.addColumn(Bytes.toBytes(cf), Bytes.toBytes(qlf)) resultValue = processHistory.get(getBytes) return resultValue.getValue(Bytes.toBytes(cf), Bytes.toBytes(qlf)) # This method to get the value from HBase by input key def getValueByKey(key, processHistory): stringValue = checkBytes(key, processHistory) if stringValue == None or stringValue =='': return '' return Bytes.toString(stringValue, 0, len(stringValue)) # This method to detect the latest files and the last processed time of the last file def detectLatestFiles(fs, inputRawData, oldTime): fileList =[] hdfsPath = Path(inputRawData) hdfsFilesList = fs.listStatus(hdfsPath) newTime = 0 for singleFile in hdfsFilesList: currentTime = singleFile.getModificationTime() if currentTime > oldTime: newTime = currentTime print "List all the latest files to process: " + str(singleFile.getPath()) print "Latest current process time: " + str(newTime) fileList.append(str(singleFile.getPath())) return newTime, fileList # This method to pass the list of unprocess files from Python to Pig script to process the data in parallel def runPigScript(): oldTime = getValueByKey('checkRaw', processHistoryTable) print 'Old process time: ' + str(oldTime) tempTime = 0l if oldTime: tempTime = long(oldTime) newTime, filesList = detectLatestFiles(fs, '/data/mydata/sample/', tempTime) if long(newTime) > 0: print 'Detected new files to process....' try: paramsToPig = { 'filesList':str(filesList)[1:-1] } P = Pig.compileFromFile('processFile.pig') processData = P.bind(paramsToPig).runSingle() putData(processHistoryTable, 'checkRaw', newTime) print 'Put the latest process time to HBase: '+ str(newTime) except: print 'Map Reduce Job is failed!!!' else: print 'Cannot detect new files to process. Stopped the Map Reduce Job!!!' # Main method to run this code if __name__ == '__main__': accessHBaseTable() runPigScript()

Verify the result

0. Run script with this command:

pig checkLatestProcessedFile.py

1. The log file when the Map Reduce job finished

Python, HBase, and Pig

2. At the first time, when we put two sample files, you can see Pig only pick two files to process and dump the output data.

3. You can verify the lastest processed time in HBase by this command:

Python, HBase, and Pig

4. Once we processed the data, we will run the code again without any new file, we will got the message like the picture below and the script will be stopped:

Python, HBase, and Pig

5. Now we need to verify the new file coming as “file3”, we can create a new file and put to HDFS with same steps in “Initial Steps” part

6. Now we can run the script again at the step 1, we will see that only file3 will be processed in pig script.

7. The structure of project should be like this, we need two bold file in the picture:

Python, HBase, and Pig

hadoop development experts hope this guide will help in detecting un-process files in HDFS with Python, HBase, and Pig. You can also read about Pig, HBase, and Python to understand the process better. If you have any query, ask them and clear your doubts.

We hope this blog will help you guys can detect un-process files to process. We will not have duplication data when processing raw data.