[bigdata-086] python3+neo4j 從mysql資料庫讀取記錄然後建立節點和關係寫入到neo4j
阿新 • • 發佈:2019-02-03
1. 測試
1.1web介面 http://tz211:7474/browser/
在這裡執行 MATCH (n) RETURN n,能看到節點和相互關係
一共是3個節點,6個相互關係
1.2 在211執行cypher-shell
./cypher-shell -u neo4j -p 123456
1.3 執行convert-test.py
上述1.1和1.2和1.3是一致的。
1.4 在web節點執行刪除所有節點命令
然後再找查詢節點,發現沒有節點了。符合預期。
在cypher-shell執行查詢結果,也沒有節點,符合預期。
在convert-test執行查詢結果,也沒有節點,符合預期。
1.5 在cypher建立節點
CREATE (:MP {num:"18717917632"});
在三處都能看到節點,正確。
MERGE (:MP {num:"18717917632"});
三處只有一個節點,正確。
MERGE (:MP {num:"18717917666"});
三處均增加了一個節點,正確。
再次執行
MERGE (:MP {num:"18717917666"});
三處沒有變化,正確。
2. 程式碼測試
2.1 參考文件
《The Neo4j Developer Manual v3.2》python版
2.2 session
session是一個容器,它儲存一系列的transaction事務。session從一個池裡獲取到連線。session不是執行緒安全的。
要把session放在一個context block,那麼,當這個block執行完畢,session就可以正確地關閉。程式碼形如:
-----------------------------
def add_person(self, name):
session = self._driver.session()
session.run("CREATE (a:Person {name: $name})", {"name": name})
-----------------------------
session在add_person這個context裡,如果add_persion執行結束,session就會正確關閉。
2.3 transaction事務
事務是一個原子單元,它包括一個或者多個cypher語句。每個事務都必然在一個session裡執行。
執行一個cypher語句,需要兩個資訊:cyphter語句模板; cyphper語句引數。也可以執行無引數cypher,但有引數可以更好的調優。
有三種事務:
A. 自動提交事務 auto-commit transactions
B. 事務函式 Transaction functions
C. 顯式事務 Explicit transactions
注意:只有B在事務出錯的時候可以自動重新執行。
2.3.1 auto-commit transactions
這種事務,是簡單的,形式有限,這種事務只有一條語句,事務失敗後不能自動重放,不能放入causal chain執行。
這種事務,在session.run函式執行,形如:
--------------------------
def add_person(self, name):
session = self._driver.session()
session.run( "CREATE (a:Person {name: $name})", {"name": name} )
--------------------------
這種事務會立刻傳送到資料庫進行處理。
不要在生產環境使用這種事務。如果擔心效能和穩定性,也不要使用這種事務。
2.3.2 事務函式 Transaction functions
這是推薦的事務型別。優先使用這種事務。舉例如下:
--------------------------
def add_person(self, name):
with self._driver.session() as session:
session.write_transaction(lambda tx: self.create_person_node(tx, name))
def create_person_node(self, tx, name):
tx.run("CREATE (a:Person {name: $name})", {"name": name})
return 1
--------------------------
2.3.3 顯式事務
這類事務,是2.3.2的詳細版本,也就是說,顯式生命BEGIN、COMMIT、ROOBACK操作。
這塊需要檢查python的原始碼,具體細節在官方文件裡沒有。
3. 一個完整的將資料以transaction functions的方式寫入到neo4j的demo
從mysql資料庫取記錄,然後建立節點,然後建立關係。
---------------------------
#!/usr/bin/env python3
#!-*- coding:utf-8 -*-
import pymysql
from neo4j.v1 import GraphDatabase
uri = "bolt://10.xx.xx.xx:7687"
driver = GraphDatabase.driver(uri, auth=("neo4j", "888888"))
print("先刪除所有節點和關係")
with driver.session() as session:
session.write_transaction(lambda tx: tx.run("MATCH (n) DETACH DELETE n"))
print("檢查是否為空")
with driver.session() as session:
session.write_transaction(lambda tx: tx.run('MATCH (n) RETURN n'))
conn = pymysql.connect(host='xxxxdb.mysql.rds.aliyuncs.com',
port=3306, user='userxxx', passwd='pswdxxxx',
db='ust', charset='utf8')
c_t = conn.cursor()
sql = 'select query_idcard, query_mobile, query_name, query_bankcard ' \
'from id34_record ' \
'where result_code=\'00\' or result_code=\'000\';'
c_t.execute(sql)
r_t = c_t.fetchall()
nnn = 0
for i in r_t:
# print(i)
if i[0] == None or i[1] == None or i[2] == None or i[3] == None:
#暫時不考慮三要素
continue
if (nnn % 100 == 0):
print('-'*10)
print(nnn)
# print(nnn)
nnn += 1
id = i[0]
mp = i[1]
name = i[2] #暫時不考慮姓名
bankcard = i[3]
#不重複地建立節點
node_type = 'MP'
with driver.session() as session:
session.write_transaction(
lambda tx: tx.run("MERGE (:MP {val:$val})", {'val':mp}))
node_type = 'ID'
with driver.session() as session:
session.write_transaction(
lambda tx: tx.run("MERGE (:ID {val:$val})", {'val':id}))
node_type = 'BANKCARD'
with driver.session() as session:
session.write_transaction(
lambda tx: tx.run("MERGE (:BANKCARD {val:$val})", {'val':bankcard}))
#建立關係
cmd = "MATCH (n:MP) WHERE n.val=\"%s\" \n" \
"MATCH (m:ID) WHERE m.val=\"%s\" \n" \
"MATCH (p:BANKCARD) WHERE p.val=\"%s\" \n" \
"MERGE (n)-[:BIND]->(m) \n" \
"MERGE (m)-[:BIND]->(n) \n" \
"MERGE (n)-[:BIND]->(p) \n" \
"MERGE (p)-[:BIND]->(n) \n" \
"MERGE (m)-[:BIND]->(p) \n" \
"MERGE (p)-[:BIND]->(m) \n" \
%(mp, id, bankcard)
with driver.session() as session:
session.write_transaction(
lambda tx: tx.run(cmd))
1.1web介面 http://tz211:7474/browser/
在這裡執行 MATCH (n) RETURN n,能看到節點和相互關係
一共是3個節點,6個相互關係
1.2 在211執行cypher-shell
./cypher-shell -u neo4j -p 123456
1.3 執行convert-test.py
上述1.1和1.2和1.3是一致的。
1.4 在web節點執行刪除所有節點命令
然後再找查詢節點,發現沒有節點了。符合預期。
在cypher-shell執行查詢結果,也沒有節點,符合預期。
在convert-test執行查詢結果,也沒有節點,符合預期。
1.5 在cypher建立節點
CREATE (:MP {num:"18717917632"});
在三處都能看到節點,正確。
MERGE (:MP {num:"18717917632"});
三處只有一個節點,正確。
MERGE (:MP {num:"18717917666"});
三處均增加了一個節點,正確。
再次執行
MERGE (:MP {num:"18717917666"});
三處沒有變化,正確。
2. 程式碼測試
2.1 參考文件
《The Neo4j Developer Manual v3.2》python版
2.2 session
session是一個容器,它儲存一系列的transaction事務。session從一個池裡獲取到連線。session不是執行緒安全的。
要把session放在一個context block,那麼,當這個block執行完畢,session就可以正確地關閉。程式碼形如:
-----------------------------
def add_person(self, name):
session = self._driver.session()
session.run("CREATE (a:Person {name: $name})", {"name": name})
-----------------------------
session在add_person這個context裡,如果add_persion執行結束,session就會正確關閉。
2.3 transaction事務
事務是一個原子單元,它包括一個或者多個cypher語句。每個事務都必然在一個session裡執行。
執行一個cypher語句,需要兩個資訊:cyphter語句模板; cyphper語句引數。也可以執行無引數cypher,但有引數可以更好的調優。
有三種事務:
A. 自動提交事務 auto-commit transactions
B. 事務函式 Transaction functions
C. 顯式事務 Explicit transactions
注意:只有B在事務出錯的時候可以自動重新執行。
2.3.1 auto-commit transactions
這種事務,是簡單的,形式有限,這種事務只有一條語句,事務失敗後不能自動重放,不能放入causal chain執行。
這種事務,在session.run函式執行,形如:
--------------------------
def add_person(self, name):
session = self._driver.session()
session.run( "CREATE (a:Person {name: $name})", {"name": name} )
--------------------------
這種事務會立刻傳送到資料庫進行處理。
不要在生產環境使用這種事務。如果擔心效能和穩定性,也不要使用這種事務。
2.3.2 事務函式 Transaction functions
這是推薦的事務型別。優先使用這種事務。舉例如下:
--------------------------
def add_person(self, name):
with self._driver.session() as session:
session.write_transaction(lambda tx: self.create_person_node(tx, name))
def create_person_node(self, tx, name):
tx.run("CREATE (a:Person {name: $name})", {"name": name})
return 1
--------------------------
2.3.3 顯式事務
這類事務,是2.3.2的詳細版本,也就是說,顯式生命BEGIN、COMMIT、ROOBACK操作。
這塊需要檢查python的原始碼,具體細節在官方文件裡沒有。
3. 一個完整的將資料以transaction functions的方式寫入到neo4j的demo
從mysql資料庫取記錄,然後建立節點,然後建立關係。
---------------------------
#!/usr/bin/env python3
#!-*- coding:utf-8 -*-
import pymysql
from neo4j.v1 import GraphDatabase
uri = "bolt://10.xx.xx.xx:7687"
driver = GraphDatabase.driver(uri, auth=("neo4j", "888888"))
print("先刪除所有節點和關係")
with driver.session() as session:
session.write_transaction(lambda tx: tx.run("MATCH (n) DETACH DELETE n"))
print("檢查是否為空")
with driver.session() as session:
session.write_transaction(lambda tx: tx.run('MATCH (n) RETURN n'))
conn = pymysql.connect(host='xxxxdb.mysql.rds.aliyuncs.com',
port=3306, user='userxxx', passwd='pswdxxxx',
db='ust', charset='utf8')
c_t = conn.cursor()
sql = 'select query_idcard, query_mobile, query_name, query_bankcard ' \
'from id34_record ' \
'where result_code=\'00\' or result_code=\'000\';'
c_t.execute(sql)
r_t = c_t.fetchall()
nnn = 0
for i in r_t:
# print(i)
if i[0] == None or i[1] == None or i[2] == None or i[3] == None:
#暫時不考慮三要素
continue
if (nnn % 100 == 0):
print('-'*10)
print(nnn)
# print(nnn)
nnn += 1
id = i[0]
mp = i[1]
name = i[2] #暫時不考慮姓名
bankcard = i[3]
#不重複地建立節點
node_type = 'MP'
with driver.session() as session:
session.write_transaction(
lambda tx: tx.run("MERGE (:MP {val:$val})", {'val':mp}))
node_type = 'ID'
with driver.session() as session:
session.write_transaction(
lambda tx: tx.run("MERGE (:ID {val:$val})", {'val':id}))
node_type = 'BANKCARD'
with driver.session() as session:
session.write_transaction(
lambda tx: tx.run("MERGE (:BANKCARD {val:$val})", {'val':bankcard}))
#建立關係
cmd = "MATCH (n:MP) WHERE n.val=\"%s\" \n" \
"MATCH (m:ID) WHERE m.val=\"%s\" \n" \
"MATCH (p:BANKCARD) WHERE p.val=\"%s\" \n" \
"MERGE (n)-[:BIND]->(m) \n" \
"MERGE (m)-[:BIND]->(n) \n" \
"MERGE (n)-[:BIND]->(p) \n" \
"MERGE (p)-[:BIND]->(n) \n" \
"MERGE (m)-[:BIND]->(p) \n" \
"MERGE (p)-[:BIND]->(m) \n" \
%(mp, id, bankcard)
with driver.session() as session:
session.write_transaction(
lambda tx: tx.run(cmd))
---------------------------
4. 建立索引
CREATE INDEX ON :MP(val)
CREATE INDEX ON :ID(val)
CREATE INDEX ON :BANKCARD(val)