Uçtan Uca Apache Nifi Uygulaması
Apache NiFi’nin güçlü veri akışı yönetimi yetenekleri sayesinde, örnek bir uygulama geliştirdim ve bu makalede sizlere bu etkileyici platformun nasıl kullanıldığını göstereceğim. Nifi hakkında daha detaylı bilgi almak isterseniz Apache Nifi Nedir? adlı makalemi okuyabilirsiniz.
Uygulama Mimarisi
Örnek uygulamada, sahte ve rastgele veri üreten REST endpoint’e Nifi ortamından istek atarak gelen verileri veri tabanına aktaracağız.
Kullanacağımız endpoint şu olacak;
https://random-data-api.com/api/v2/users
A) Kurulumlar
Tüm araçları docker üzerinden ayağa kaldırıyoruz.
1.Apache Nifi
docker run --name mynifi --network host -i -v ~/drivers/shared-directory:/opt/nifi/nifi-current/ls-target apache/nifi
Nifi üzerinden veritabanlarına erişim için driver dosyalarına ihtiyaç vardır. Bu sebeple volume vermeniz gerekir.
~/drivers/shared-directory:/opt/nifi/nifi-current/ls-target
2. PostgreSQL
docker run --name docker_postgres -e POSTGRES_PASSWORD=123456 -d -p 5432:5432 -v $HOME/docker/volumes/postgres:/var/lib/postgresql/data postgres
Table Oluşturma
CREATE TABLE IF NOT EXISTS public.users
(
id integer NOT NULL,
uid text COLLATE pg_catalog."default" NOT NULL,
password text COLLATE pg_catalog."default" NOT NULL,
first_name text COLLATE pg_catalog."default" NOT NULL,
last_name text COLLATE pg_catalog."default" NOT NULL,
username text COLLATE pg_catalog."default" NOT NULL,
email text COLLATE pg_catalog."default" NOT NULL,
avatar text COLLATE pg_catalog."default" NOT NULL,
gender text COLLATE pg_catalog."default" NOT NULL,
phone_number text COLLATE pg_catalog."default" NOT NULL,
social_insurance_number integer NOT NULL,
date_of_birth date NOT NULL,
employment text COLLATE pg_catalog."default",
)
CREATE TABLE IF NOT EXISTS public.subscriptions
(
plan text COLLATE pg_catalog."default" NOT NULL,
status text COLLATE pg_catalog."default" NOT NULL,
payment_method text COLLATE pg_catalog."default" NOT NULL,
term text COLLATE pg_catalog."default" NOT NULL,
)
CREATE TABLE IF NOT EXISTS public.creditcard
(
cc_number text COLLATE pg_catalog."default"
)
CREATE TABLE IF NOT EXISTS public.adress
(
city text COLLATE pg_catalog."default",
street_name text COLLATE pg_catalog."default",
street_address text COLLATE pg_catalog."default",
zip_code text COLLATE pg_catalog."default",
state text COLLATE pg_catalog."default",
country text COLLATE pg_catalog."default",
coordinates text COLLATE pg_catalog."default"
)
3.PostgreSQL Driver
Bu driver ile apache nifi postgresql ile haberleşebilecektir.
mkdir jar
cd jar
curl -O -L "https://jdbc.postgresql.org/download/postgresql-42.6.0.jar"
4. Superset
git clone https://github.com/apache/superset.git
cd superset
docker-compose -f docker-compose-non-dev.yml pull
docker-compose -f docker-compose-non-dev.yml up
Veri görselleştirme aracı olan Superset hakkında daha fazla bilgi almak ve kurulum yardım almak isterseniz aşağıdaki adrese başvurun.
B) Ortamlara Erişim
Nifi ortamına erişebilmek için öncelikle user ve password bilgilerine erişmek gereklidir. Bu bilgiler nifi ayağa kalkarken logların arasında bulunur.
Generated Username [USERNAME]
Generated Password [PASSWORD]
Daha sonra bu erişim bilgileri ile arayüze erişim sağlanır.
Artık sol üstte bulunan processorleri sürükle bırak yaparak dataflow oluşturulabilir.
C) Dataflow Oluşturma
İlk olarak endpointe istek atacağız, daha sonra cevap olarak gelen veriyi uygun ETL sürecine sokarak veritabanına kaydedeceğiz.
A. Endpointe İstek Atmak
Endpoint’de istek atıp cevap döndürmek için InvokeHTTP processor’ünü kullanacağız.
Bu processor HTTP URL parametresine yazılan adrese HTTP Method’da seçili olan istek türünden istek atar. Örnek Json:
{
"id":1575,
"uid":"f4889144-613e-4e0b-9d79-ea3fa0d293b4",
"password":"PlmGU8pgE3",
"first_name":"Jay",
"last_name":"O'Kon",
"username":"jay.o'kon",
"email":"jay.o'kon@email.com",
"avatar":"https://robohash.org/suscipitutadipisci.png?size=300x300\u0026set=set1",
"gender":"Genderfluid",
"phone_number":"+370 (130) 836-6108 x9251",
"social_insurance_number":"122106982",
"date_of_birth":"1961-05-04",
"employment":{
"title":"International Hospitality Architect",
"key_skill":"Leadership"
},
"address":{
"city":"East Vina",
"street_name":"Davis Mall",
"street_address":"9214 Stacie Square",
"zip_code":"41334",
"state":"Pennsylvania",
"country":"United States",
"coordinates":{
"lat":-29.476027673621594,
"lng":21.52274723252242
}
},
"credit_card":{
"cc_number":"5216-5092-0649-6639"
},
"subscription":{
"plan":"Business",
"status":"Blocked",
"payment_method":"Cash",
"term":"Annual"
}
}
B. Json Parse İşlemleri
Bu aşamada ilk olarak gelen veriyi incelememiz gerekiyor. Gelen veriyi parse edebilmek için SplitRecord Processorünü kullanacağız. InvokeHTTP ile bu nodu birbirine bağlamanız gerekmetedir.
Split record, Record Reader & Record Writer parametrelerine sahiptir. Bunlar gelen verinin türüne göre değiştirilmelidir.
SplitRecord işlemi sonucunda, key:value şeklinde gelen veri Json yapısına dönüştü.
Gelen veriyi incelediğimizde credit_card,subscription,adresss olarak childlara bölündüğünü görüyoruz. Veri tabanında buna benzer şekilde kaydedeceğiz.
Şimdi ise SplitJson ve EvulateJsonPath processorlerini kullanarak gelen veriyi childlarına böleceğiz.
SplitJson: Veri içerisinden child olanları ayırmak için kullanıyoruz.
JsonPath Expression: Veriyi nasıl pars edeceğimizi belirler.
- “$” işareti gelen veriyi temsil eder.
- “..” ise JSON belgesi içindeki tüm derinliklerde arama yapmayı ifade eder.
EvulateJsonPath: Childlar dışında kalan verileri almak için kullanıyoruz.
JsonPath Expression parametresine aşağıdaki değerleri giriyoruz.
$.[0][‘id’, ‘uid’, ‘password’, ‘first_name’, ‘last_name’, ‘username’, ‘email’, ‘avatar’, ‘gender’, ‘phone_number’, ‘social_insurance_number’, ‘date_of_birth’, ‘employment’]
Tüm childlara bölüm işlemi tamamlandıktan sonra dataflow şuna benzeyecek.
- Parse işlemi sonucunda bazı verilerin Json tipinde olmadığını fark ettik. Bu sebeple tekrar splitrecord kullanarak json objesine çeviriyoruz.
C. Veritabanı İşlemleri
Hazırlanan Json tipindeki veri, PutDatabaseRecord processor’ü kullanılarak veri tabanına yazılır.
Record Reader: Gelen verinin hangi türde olduğunu belirler
Statement Type: Veritabanına hangi CRUD işleminin yapılacağını belirler.
Table Name: Verilerin hangi tabloya yazılacağını belirler.
Database Connection Pooling Service: veritabanı erişim bilgilerini içerir.
Controller Servisler
PostgreSQL bağlantı bilgileri için DBCPConnectionPool aktif edilmelidir.
Database Connection URL: jdbc:postgresql://{HOST_IP}:{HOST_PORT}/{DATABASE_NAME}
Database Driver Class Name: org.postgresql.Driver
Database Driver Location(s): /opt/nifi/nifi-current/ls-target
Her şey tamamlandı, artık dataflowu çalıştırabiliriz.
Son olarak dataflow’u template olarak import etmek isterseniz aşağıdaki repodan ulaşabilirsiniz.
Ek Kaynaklar