What is Embulk?
Concept
Embulk is an open source tool whose basic function is to load records from one database and import them to another. In addition, there are functions to import data into other databases through the use of plugins that make the conversion and data processing simpler and more convenient.
Embulk works on java platform, so it can easily work on many different operating systems.
Advantages
- Simple installation, works on many different operating systems
- The functions are provided in the form of plugins, so for each specific work there will be the necessary plugins.
- Plugins are always updated continuously from the developer
- Embulk and its plugins are provided for free and allow users to freely customize them to suit individual requirements.
Structure of Embulk
Basically, the structure of embulk is divided into 3 main parts:
- Input plugins data, Decoder plugins, Parser plugins: Provide different input data methods. example: mysql, postgresql, Amazon S3, HDFS, http (get data from api)
- Data processing plugins: Provide plugins that allow for data filtering (filter plugins)
- Output plugins data, Formater plugins, Encoder plugins
Some plugins:
input plugins :
- RDBS (mysql, postgres, jdbc …)
- NoSQL (redis, mongodb)
- Cloud Service (redshift, s3)
- Files (CSV, JSON …)
- Etc (hdfs, http, elastic search, slack-history, google analitics)
output plugins :
- RDBS (mysql, postgres, oracle, jdbc …)
- Cloud Service (redshift, s3, bigquery)
- NoSQL (redis, hdfs)
- Files
- Etc (elastic search, hdfs, swift)
filter plugins :
- column (cut the column)
- insert Add columns such as host name to the specified location
- row Extract only rows that meet certain conditions
- rearrange Reconstructs one row of data into multiple rows
File parser plugins :
- json
- xml
- csv
- apache log
- query_string
- regex
File formatter Plugin :
- json
- A plugin that formats the contents of a record in the format of jsonl (1 json 1 line)
- poi_excel
- Plugin to convert to Excel (xls, xlsx) format data
Excutor Plugin :
- mapreduce
- Plugin for running Embulk tasks on Hadoop
Details of these plugins can be found here: https://plugins.embulk.org/ . There are many useful flugins here.
How it works
In case of data from database -> other database. We have the following facilities:
The data will be read by input plugin -> data processing -> output plugin will import into new database.
In the more general case. Data can be read from a file or from some specific data type
Install Embulk
To install Embulk, you first need to install java on your device. Note that Embulk only works on java8 for now .
1 2 3 | sudo apt update sudo apt install openjdk-8-jre-headless |
Install embulk
1 2 3 4 5 | curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar" chmod +x ~/.embulk/bin/embulk echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc source ~/.bashrc |
Install plugin for embulk
1 2 | embulk gem install embulk-output-postgresql embulk-filter-column embulk-filter-add_time embulk-input-postgresql embulk-filter-ruby_proc embulk-input-http embulk-parser-jsonpath embulk-filter-eval |
Above is to install some plugin for importing data from csv or api json -> into database postgresql. For other purposes, see the plugins above.
Run embulk with config file
1 2 | embulk run --log-level warn config.yml.liquid |
Some config examples for Embulk
Import data from csv -> into database postgresql
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 | in: type: file path_prefix: /var/batch/data/csv_year_generic_medicine_export_pjx.csv parser: charset: UTF-8 newline: LF type: csv delimiter: ',' quote: '"' escape: '"' null_string: '' trim_if_not_quoted: false skip_header_lines: 1 allow_extra_columns: true allow_optional_columns: true columns: - {name: personal_id, type: string} - {name: 対象年度, type: string} - {name: ジェネリック医薬品数量, type: long} - {name: 置き換え可能医薬品数量, type: long} - {name: ジェネリック医薬品費, type: long} - {name: ジェネリック医薬品のある先発医薬品費, type: long} - {name: ジェネリック医薬品のない先発医薬品費, type: long} filters: - type: add_time to_column: name: create_datetime type: timestamp from_value: mode: upload_time - type: add_time to_column: name: update_datetime type: timestamp from_value: mode: upload_time - type: column add_columns: - {name: create_user, type: string, default: "SYSTEM"} - {name: update_user, type: string, default: "SYSTEM"} - type: rename columns: personal_id: personal_id 対象年度: target_year ジェネリック医薬品数量: generic_drug_quantity 置き換え可能医薬品数量: replaceable_drug_quantity ジェネリック医薬品費: original_drug_cost ジェネリック医薬品のある先発医薬品費: original_drug_cost_with_generic ジェネリック医薬品のない先発医薬品費: original_drug_cost_without_generic out: type: postgresql host: {{ env.DB_HOST }} user: {{ env.DB_USER }} password: {{ env.DB_PASSWORD }} database: {{ env.DB_NAME }} table: raw_yearly_generic_medicine mode: truncate_insert |
Import data from api json -> into the postgresql database
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | in: type: http url: {{ env.API_URL }} method: get params: - {name: limit, value: {{ env.PAGINATE }}} pager: {from_param: offset, pages: {{ env.TOTAL_PAGE }}, start: 0, step: {{ env.PAGINATE }}} flatten_json_array: true parser: charset: UTF-8 newline: LF type: jsonpath columns: - {name: personal_id, type: string} - {name: event_at, type: timestamp, format: '%Y-%m-%dT%H:%M:%S%z'} - {name: event_name, type: string} - {name: flow_type, type: long} - {name: flow_type_name, type: string} - {name: point, type: long} - {name: id, type: string} - {name: deleted_at, type: timestamp, format: '%Y-%m-%dT%H:%M:%S%z'} filters: - type: add_time to_column: name: create_datetime type: timestamp from_value: mode: upload_time - type: add_time to_column: name: update_datetime type: timestamp from_value: mode: upload_time - type: column add_columns: - {name: create_user, type: string, default: "SYSTEM"} - {name: update_user, type: string, default: "SYSTEM"} - {name: delete_flg, type: boolean, default: false} - type: rename columns: id: amulet_id out: type: postgresql host: {{ env.DB_HOST }} user: {{ env.DB_USER }} password: {{ env.DB_PASSWORD }} database: {{ env.DB_NAME }} table: a_point_history mode: merge merge_keys: - amulet_id |
Refer to embulk’s http plugin here: https://github.com/takumakanari/embulk-input-http
summary
Above, I have introduced about Embulk, an excellent data converter. Hope it can help ease your pain
See more at:
https://dev.embulk.org/customization.html
https://qiita.com/tashiro_gaku/items/f7fa0f1a99c759d947a7#configxml に mysql プ ラ グ イ ン 情報 を 追記